Tag Archives: Amazon EMR

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

AWS Architecture Monthly Magazine: Education

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/aws-architecture-monthly-magazine-education/

Young man sitting on a stack of books with his laptopOne of the missions of the education industry is to educate the next generation of the industry-ready workforce. Whether K-12, higher education, or continuing education, enabling teachers and professors to effectively deliver curriculum and improve student performance is a goal of Education Technology (EdTech) and learning companies. Two trends for AWS use cases in education are: 1) accessible remote learning; and 2) remote collaboration. For brevity, there are other innovation trend areas in education that we didn’t focus on in our “Ask an Expert” interview despite their importance. Use cases around learning accessibility, student performance, and campus experience have taken advantage of Amazon Alexa, Amazon Lex, and a variety of AWS technology areas including artificial intelligence (AI) and machine learning, data lakes, analytics, and mobile development. To dive deep into a wider range of education use cases, we invite everyone to look at our AWS Education blog.

In this month’s issue

For May’s Education issue, we asked our expert, Yuriko Horvath, about general architecture patterns in the education space as well as what education customers need to think about and ask themselves before considering AWS.

  • Ask an Expert: Yuriko Horvath, AWS Manager of Education for Solutions Architecture
  • Blog: How to Build a Chatbot for Your School in Less Than an Hour (with step-by-step video instructions)
  • Case Study: Virginia Tech: Building Modern Analytics on Amazon Web Services
  • Solution: Video on Demand on AWS
  • Whitepaper: Teaching Big Data Skills with Amazon EMR

How to access the magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle Newsstand page or contact us anytime at [email protected].

Build an automatic data profiling and reporting solution with Amazon EMR, AWS Glue, and Amazon QuickSight

Post Syndicated from Francesco Marelli original https://aws.amazon.com/blogs/big-data/build-an-automatic-data-profiling-and-reporting-solution-with-amazon-emr-aws-glue-and-amazon-quicksight/

In typical analytics pipelines, one of the first tasks that you typically perform after importing data into your data lakes is data profiling and high-level data quality analysis to check the content of the datasets. In this way, you can enrich the basic metadata that contains information such as table and column names and their types.

The results of data profiling help you determine whether the datasets contain the expected information and how to use them downstream in your analytics pipeline. Moreover, you can use these results as one of the inputs to an optional data semantics analysis stage.

The great quantity and variety of data in modern data lakes make unstructured manual data profiling and data semantics analysis impractical and time-consuming. This post shows how to implement a process for the automatic creation of a data profiling repository, as an extension of AWS Glue Data Catalog metadata, and a reporting system that can help you in your analytics pipeline design process and by providing a reliable tool for further analysis.

This post describes in detail the application Data Profiler for AWS Glue Data Catalog and provides step-by-step instructions of an example implementation.

Overview and architecture

The following diagram illustrates the architecture of this solution.

Data Profiler for AWS Glue Data Catalog is an Apache Spark Scala application that profiles all the tables defined in a database in the Data Catalog using the profiling capabilities of the Amazon Deequ library and saves the results in the Data Catalog and an Amazon S3 bucket in a partitioned Parquet format. You can use other analytics services such as Amazon Athena and Amazon QuickSight to query and visualize the data.

For more information about the Amazon Deequ data library, see Test data quality at scale with Deequ or the source code on the GitHub repo.

Metadata can be defined as data about data. Metadata for a table contains information like the table name and other attributes, column names and types, and the physical location of the files that contain the data. The Data Catalog is the metadata repository in AWS, and you can use it with other AWS services like Athena, Amazon EMR, and Amazon Redshift.

After you create or update the metadata for tables in a database (for example, adding new data to the table), either with an AWS Glue crawler or manually, you can run the application to profile each table. The results are stored as new versions of the tables’ metadata in the Data Catalog, which you can view interactively via the AWS Lake Formation console or query programmatically via the AWS CLI for AWS Glue.

For more information about the Data Profiler, see the GitHub repo.

The Deequ library does not support tables with nested data (such as JSON). If you want to run the application on a table with nested data, this must be un-nested/flattened or relationalized before profiling. For more information about useful transforms for this task, see AWS Glue Scala DynamicFrame APIs or AWS Glue PySpark Transforms Reference.

The following table shows the profiling metrics the application computes for column data types Text and Numeric. The computation of some profiling metrics for Text columns can be costly and is disabled by default. You can enable it by setting the compExp input parameter to true (see next section).

MetricDescriptionData Type
ApproxCountDistinctApproximate number of distinct values, computed with HyperLogLogPlusPlus sketches.Text / Numeric
CompletenessFraction of non-null values in a column.Text / Numeric
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least one time. For example, [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Text / Numeric
MaxLengthMaximum length of the column.Text
MinLengthMinimum length of the column.Text
CountDistinctExact number of distinct values.Text
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). For example, [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Text
HistogramThe summary of values in a column of a table. Groups the given column’s values and calculates the number of rows with that specific value and the fraction of this value.Text
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly one time; distinct values occur at least one time. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.Text
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly one time. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Text
ApproxQuantilesApproximate quantiles of a distribution.Numeric
MaximumMaximum value of the column.Numeric
MeanMean value of the column.Numeric
MinimumMinimum value of the column.Numeric
StandardDeviationStandard deviation value of the column.Numeric
SumSum of the column.Numeric

Application description

You can run the application via spark-submit on a transient or permanent EMR cluster (see the “Creating an EMR cluster” section in this post for minimum version specification) with Spark installed and configured with the Data Catalog settings option Use for Spark table metadata enabled.

The following example code executes the application:

 $ spark-submit \
  --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
  --master yarn \
  --deploy-mode cluster \
  --name data-profiler-for-aws-glue-data-catalog \
  /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
  --dbName nyctlcdb \
  --region eu-west-1 \
  --compExp true \
  --statsPrefix DQP \
  --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
  --profileUnsupportedTypes true \
  --noOfBins 30 \
  --quantiles 10

The following table summarizes the input parameters that the application accepts.

NameTypeRequiredDefaultDescription
--dbName (-d)StringYesN/AData Catalog database name. The database must be defined in the Catalog owned by the same account where the application is executed.
--region (-r)StringYesN/AAWS Region endpoint where the Data Catalog database is defined, for example us-west-1 or us-east-1. For more information, see Regional Endpoints.
--compExp (-c)BooleanNofalseIf true, the application also executes “expensive” profiling analyzers on Text columns. These are CountDistinct, Entropy, Histogram, UniqueValueRatio, and Uniqueness. If false, only the following default analyzers are executed: ApproxCountDistinct, Completeness, Distinctness, MaxLength, MinLength. All analyzers for Numeric columns are always executed.
--statsPrefix (-p)StringNoDQPString prepended to the statistics names in the Data Catalog. The application also adds two underscores (__). This is useful to identify metrics calculated by the application.
--s3BucketPrefix (-s)StringNoblankFormat must be s3Buckename/prefix. If specified, the application writes Parquet files with metrics in the prefixes db_name=…/table_name=….
--profileUnsupportedTypes (-u)BooleanNofalseBy default, the Amazon Deequ library only supports Text and Numeric columns. If this parameter is set to true, the application also profiles columns of type Boolean and Date.
--noOfBins (-b)IntegerNo10When --compExp (-c) is true, sets the number of maximum values to create for the Histogram analyzer for String columns.
--quantiles (-q)IntegerNo10Sets the number of quantiles to calculate for the ApproxQuantiles analyzer for numeric columns.

Setting up your environment

The following walkthrough demonstrates how to create and populate a Data Catalog database with three tables, which simulates a process with monthly updates. For this post, you simulate three monthly runs: February 2, 2019, March 2, 2019, and April 2, 2019.

After table creation and after each monthly table update, you run the application to generate and update the profiling information for each table in the Data Catalog and Amazon S3 repository. The post also provides examples of how to query the data using the AWS CLI and Athena, and build a simple Amazon QuickSight dashboard.

This post uses the New York City Taxi and Limousine Commission (TLC) Trip Record Data on the Registry of Open Data on AWS. In particular, you use the tables yellow_tripdata, fhv_tripdata, and green_tripdata.

The following steps explain how to set up the environment.

Creating an EMR cluster

The first step is to create an EMR cluster. Connect to the cluster master node and execute the code via spark-submit. Make sure that the cluster version is at least 5.28.0 with at least Hadoop and Spark installed and that you use the Data Catalog as table metadata for Spark.

The master node should also be accessible via SSH. For instructions, see Connect to the Master Node Using SSH.

Downloading the application

You can download the source code and create a uber jar with all dependencies from the application GitHub repo. You can build the application as a uber jar with all dependencies using the Scala Build Tool (sbt) with the following commands (adjust the memory values according to your needs):

$ export SBT_OPTS="-Xms1G -Xmx3G -Xss2M -XX:MaxMetaspaceSize=3G" && sbt assembly

By default, the .jar file is created in the following path, relative to the project root directory:

./target/scala-2.11/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar

When the .jar file is available, copy it to the master node of the EMR cluster. One way to copy the file to the master node code is to copy it to Amazon S3 from the client where the file was created and download it from Amazon S3 to the master node.

For this post, copy the file in the /home/hadoop directory of the master node. The full path is /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar.

Setting up S3 buckets and copy initial data

You use an S3 bucket to store the data that you profile. For this post, the bucket name is

s3://aws-big-data-blog-samples/

You need to create a bucket with a unique name in your account and store the data there. When the bucket is created and available, use the AWS CLI to copy the first set of files for January 2019 (therefore simulating the February 2, 2019, run) from the s3://nyc-tlc/ bucket. See the following code:

$ DEST_BUCKET=aws-big-data-blog-samples
$ MONTH=2019-01
 
$ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

After you copy the data to your destination bucket, create a second bucket to store the files with the profiling metrics created by the application. This step is optional because you can write the metrics to a prefix in an existing bucket. See the following code:

$ aws s3 mb s3://deequ-profiler/

You are now ready to create the database and tables metadata in the Data Catalog.

Creating metadata in the Data Catalog

The first step is to create a database in AWS Glue. You can create the database using the AWS CLI. See the following code:

$ aws glue create-database \
    --database-input '{"Name": "nyctlcdb"}'

Alternatively, on the AWS Glue console, choose Databases, Add database.

After you create the database, create a new AWS Glue Crawler to infer the schema of the data in the files you copied in the previous step. Complete the following steps:

  1. On the AWS Glue console, choose Crawler.
  2. Choose Add crawler.
  3. For Crawler name, enter nyc-tlc-db-raw.
  4. Choose Next.
  5. For Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the S3 bucket and prefix where you copied the data earlier.
  8. Choose Next.
  9. In the Choose an IAM role section, select Choose an existing IAM role.
  10. Choose an IAM role that provides access to the S3 bucket and allows writing to the Data Catalog, or create a new one while creating this crawler.
  11. Choose Next.
  12. Choose the database you created earlier to store the tables’ metadata.
  13. Review the crawler properties and choose Finish.
    You can run the crawler when it’s ready. It creates three new tables in the database. The following screenshot shows the update you receive that the crawler is complete.
  14. You can now use the Lake Formation console to check the tables are correct. See the following screenshot of the Tables.If you select one of the tables, the table version is now 0. See the following screenshot.You can also perform the same check using the AWS CLI. See the following code:
    $ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId' 

    [
        "0"
    ]

  15. Check the parameters in the table metadata to verify which values the crawler generated. See the following code:
    $ aws glue get-table \
    	--database-name nyctlcdb \
    	--name trip_data_yellow \
       	--query 'Table.Parameters' 

    {
        "CrawlerSchemaDeserializerVersion": "1.0",
        "CrawlerSchemaSerializerVersion": "1.0",
        "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
        "areColumnsQuoted": "false",
        "averageRecordSize": "144",
        "classification": "csv",
        "columnsOrdered": "true",
        "compressionType": "none",
        "delimiter": ",",
        "objectCount": "1",
        "recordCount": "4771445",
        "sizeKey": "687088084",
        "skip.header.line.count": "1",
        "typeOfData": "file"
    }

  16. Check the metadata attributes for three columns in the same table. This post chooses the following columns because they have different data types, though any other column is suitable for this check. In the following code, the only attributes currently available for the columns are “Name” and “Type:”
    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'

    [
        {
            "Name": "fare_amount",
            "Type": "double"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'

    [
        {
            "Name": "passenger_count",
            "Type": "bigint"
        }
    ]

You can display the same information via the Lake Formation console. See the following screenshot.

You are now ready to execute the application.

First application execution

Connect to the EMR cluster master node via SSH and run the application with the following code (change the input parameters as needed, especially the value for the s3BucketPrefix parameter):

$ spark-submit \
    --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
    --master yarn \
    --deploy-mode cluster \
    --name data-profiler-for-aws-glue-data-catalog \
    /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
    --dbName nyctlcdb \
    --region eu-west-1 \
    --compExp true \
    --statsPrefix DQP \
    --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
    --profileUnsupportedTypes true \
    --noOfBins 30 \
    --quantiles 10

Profiling information in the metadata in the Data Catalog

When the application is complete, you can recheck the metadata via the Lake Formation console for the tables and verify that a new table version was created. See the following screenshot.

You can verify the same information via the AWS CLI. See the following code:

$ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId'
[
    "1",
    "0"
]

Check the metadata for the table and verify that the profiling information the application generated was successfully stored. In the following code, the parameter “DQP__Size” was generated, which contains the number of records in the table as calculated by the Deequ library:

$ aws glue get-table \ 
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.Parameters'
{
    "CrawlerSchemaDeserializerVersion": "1.0-",
    "CrawlerSchemaSerializerVersion": "1.0",
    "DQP__Size": "7667793.0",
    "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
    "areColumnsQuoted": "false",
    "averageRecordSize": "144",
    "classification": "csv",
    "columnsOrdered": "true",
    "compressionType": "none",
    "delimiter": ",",
    "objectCount": "1",
    "recordCount": "4771445",
    "sizeKey": "687088084",
    "skip.header.line.count": "1",
    "typeOfData": "file"
}

Similarly, you can verify that the metadata for the columns you checked previously contains the profiling information the application generated. This is stored in the “Parameters” object for each column. Each new attribute starts with the string “DQP” as specified in the statsPrefix input parameter. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 
[
    {
        "Name": "store_and_fwd_flag",
        "Type": "string",
        "Parameters": {
            "DQP__ApproxCountDistinct": "3.0",
            "DQP__Completeness": "1.0",
            "DQP__CountDistinct": "3.0",
            "DQP__Distinctness": "3.912468685578758E-7",
            "DQP__Entropy": "0.03100483390393341",
            "DQP__Histogram.abs.N": "7630142.0",
            "DQP__Histogram.abs.Y": "37650.0",
            "DQP__Histogram.abs.store_and_fwd_flag": "1.0",
            "DQP__Histogram.bins": "3.0",
            "DQP__Histogram.ratio.N": "0.9950897213839758",
            "DQP__Histogram.ratio.Y": "0.004910148200401341",
            "DQP__Histogram.ratio.store_and_fwd_flag": "1.3041562285262527E-7",
            "DQP__MaxLength": "18.0",
            "DQP__MinLength": "1.0",
            "DQP__UniqueValueRatio": "0.3333333333333333",
            "DQP__Uniqueness": "1.3041562285262527E-7"
        }
    }
]
$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'
[
    {
        "Name": "fare_amount",
        "Type": "double",
        "Parameters": {
            "DQP__ApproxCountDistinct": "6125.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "8.187492802687814E-4",
            "DQP__Maximum": "623259.86",
            "DQP__Mean": "12.40940884025023",
            "DQP__Minimum": "-362.0",
            "DQP__StandardDeviation": "262.0720412055651",
            "DQP__Sum": "9.515276582999998E7",
            "DQP__name-0.1": "5.0",
            "DQP__name-0.2": "6.0",
            "DQP__name-0.3": "7.0",
            "DQP__name-0.4": "8.0",
            "DQP__name-0.5": "9.0",
            "DQP__name-0.6": "10.5",
            "DQP__name-0.7": "12.5",
            "DQP__name-0.8": "15.5",
            "DQP__name-0.9": "23.5",
            "DQP__name-1.0": "623259.86"
        }
    }
]

The parameters named “DQP__name-x.x” are the results of the ApproxQuantiles Deequ analyzer for numeric columns; the number of quantiles is set via the –quantiles (-q) input parameter of the application. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'
[
    {
        "Name": "passenger_count",
        "Type": "bigint",
        "Parameters": {
            "DQP__ApproxCountDistinct": "10.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "1.3041562285262527E-6",
            "DQP__Maximum": "9.0",
            "DQP__Mean": "1.5670782410373156",
            "DQP__Minimum": "0.0",
            "DQP__StandardDeviation": "1.2244305354114957",
            "DQP__Sum": "1.201603E7",
            "DQP__name-0.1": "1.0",
            "DQP__name-0.2": "1.0",
            "DQP__name-0.3": "1.0",
            "DQP__name-0.4": "1.0",
            "DQP__name-0.5": "1.0",
            "DQP__name-0.6": "1.0",
            "DQP__name-0.7": "1.0",
            "DQP__name-0.8": "2.0",
            "DQP__name-0.9": "3.0",
            "DQP__name-1.0": "9.0"
        }
    }
]

Profiling information in Amazon S3

You can now also verify that the profiling information was saved in Parquet format in the S3 bucket you specified in the s3BucketPrefix application input parameter. The following screenshot shows the buckets via the Amazon S3 console.

The data is stored using prefixes that are compatible with Apache Hive partitions. This is useful to optimize performance and costs when you use analytics services like Athena. The partitions are defined on db_name and table_name. The following screenshot shows the details of table_name=trip_data_yellow.

Each execution of the application generates one Parquet file appending data to the metrics table for each physical table.

Second execution after monthly table updates

To run the application after monthly table updates, complete the following steps:

  1. Copy the new files for February 2019 to simulate the March 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-02
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. The following screenshot shows that the three tables were updated successfully.
  3. Check that the crawler created a third version of the table. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "2",
        "1",
        "0"
    ]

  4. Rerun the application to generate the new profiling metadata, entering the same code as before. To keep clean information, before storing new profiling information in the metadata, the application removes all custom attributes starting with the string specified in the “statsPrefix” See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

    Following a successful execution, a new version of the table was created. See the following code:

    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "3",
        "2",
        "1",
        "0"
    ]

  5. Check the value of the DQP__Size attribute; its value has changed. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "1.4687169E7"
    }

  6. Check one of the columns you saw earlier to see the updated profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "2.042599223853147E-7",
                "DQP__Entropy": "0.0317381414905775",
                "DQP__Histogram.abs.N": "1.4613018E7",
                "DQP__Histogram.abs.Y": "74149.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "2.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9949513074984022",
                "DQP__Histogram.ratio.Y": "0.005048556328316233",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.361732815902098E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

Third execution after monthly tables updates

To run the application a third time, complete the following steps:

  1. Copy the new files for March 2019 to simulate the April 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-03
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. You now have five versions of the table metadata. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "4",
        "3",
        "2",
        "1",
        "0"
    ]

  3. Rerun the application to update the profiling information. See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

  4. Check the DQP__Size parameter to see its new updated value. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "2.2519715E7"
    }

  5. Check one of the columns you saw earlier to the update profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "1.3321660598280218E-7",
                "DQP__Entropy": "0.030948463301702846",
                "DQP__Histogram.abs.N": "2.2409376E7",
                "DQP__Histogram.abs.Y": "110336.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "3.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9951003376374878",
                "DQP__Histogram.ratio.Y": "0.004899529145906154",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.3321660598280218E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

You can view and manage the same values via the Lake Formation console. See the following screenshot of the Edit column section.

Data profiling reporting with Athena and Amazon QuickSight

As demonstrated earlier, the application can save profiling information in Parquet format to an S3 bucket and prefix into db_name and table_name partitions. See the following code:

$ aws s3 ls s3://deequ-profiler/deequ-profiler-metrics/ --recursive
2020-01-28 09:30:12          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/_SUCCESS
2020-01-28 09:17:15       6506 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-760dafb1-fc37-4700-a506-a9dc71b7a745-c000.snappy.parquet
2020-01-28 09:01:19       6498 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-78dd2c4a-83c2-44c4-aa71-30e7a9fb0089-c000.snappy.parquet
2020-01-28 09:30:11       6505 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-cff4f2de-64b4-4338-a0f6-a50ed34a378f-c000.snappy.parquet
2020-01-28 09:30:08          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/_SUCCESS
2020-01-28 09:01:15       6355 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-0d5969c9-70a7-4cd4-ac64-8f16e35e23b5-c000.snappy.parquet
2020-01-28 09:17:11       6353 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-12a7b0b0-6a2a-45d5-a241-645148af41d7-c000.snappy.parquet
2020-01-28 09:30:08       6415 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-adecccd6-a884-403f-aa80-c574647a10f9-c000.snappy.parquet
2020-01-28 09:29:56          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/_SUCCESS
2020-01-28 09:16:59       6408 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-2e5e3280-29db-41b9-be67-a68ef8cb9777-c000.snappy.parquet
2020-01-28 09:01:02       6424 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-c4972037-7d3c-4279-8b77-361741133816-c000.snappy.parquet
2020-01-28 09:29:55       6398 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-f2d6076e-7019-4b03-97ba-a6aab8a677b8-c000.snappy.parquet

The application generates one Parquet file per execution.

Preparing metadata for profiler metrics data

To prepare the metadata for profiler metrics data, complete the following steps:

  1. On the Lake Formation console, create a new database with the name deequprofilerdb to contain the metadata.
  2. On the AWS Glue console, create a new crawler with the name deequ-profiler-metrics to infer the schema of the profiling information stored in Amazon S3.

The following screenshot shows the properties of the new crawler.

After you run the crawler, one table with the name deequ_profiler_metrics was created in the database. The table has the following columns.

NameData TypePartitionDescription
instancestringColumn name the statistic in column “name” refers to. Set to “*” if entity is “Dataset”.
entitystringEntity the statistic refers to. Valid values are “Column” and “Dataset”.
namestringMetrics name, derived from the Deequ Analyzer used for the calculation.
valuedoubleValue of the metric.
typestringData type of the column if entity is “Column”, blank otherwise.
db_name_embedstringDatabase name, same values as in partition “db_name”.
table_name_embedstringTable name, same values as in partition “table_name”.
profiler_run_dtdateDate the profiler application was run.
profiler_run_tstimestampDate/time the profile application was run; it can also be used as execution identifier.
db_namestring1Database name.
table_namestring2Table name.

Reporting with Athena

You can use Athena to run a query that checks the statistics for a column in the database for the execution you ran in March 2019. See the following code:

SELECT db_name, 
    profiler_run_dt,
    profiler_run_ts,
    table_name, 
    entity,
    instance,
    name,
    value
FROM "deequprofilerdb"."deequ_profiler_metrics" 
WHERE db_name = 'nyctlcdb' AND
    table_name = 'trip_data_yellow' AND 
    entity = 'Column' AND
    instance = 'extra' AND
    profiler_run_dt = date_parse('2019-03-02','%Y-%m-%d')
ORDER BY name
;

The following screenshot shows the query results.

Reporting with Amazon QuickSight

To create a dashboard in Amazon QuickSight based on the profiling metrics data the application generated, complete the following steps:

  1. Create a new QuickSight dataset called deequ_profiler_metrics with Athena as the data source.
  2. In the Choose your table section, select the profiling metrics table that you created earlier.
  3. Import the data into SPICE.

After you create the dataset, you can view it and edit its properties. For this post, leave the properties unchanged.

You are now ready to build visualizations and dashboards.

The following images in this section show a simple analysis with controls that allow for the selection of the Database, Table profiled, Entity, Column, and Profiling Metric.

Control NameMapped Column
Databasedb_name
Tabletable_name
Entityentity
Columninstance
Profiling Metricname

For more information about adding controls, see Create Amazon QuickSight dashboards that have impact with parameters, on-screen controls, and URL actions.

For example, you can select the Size metric of a specific table to see how many records are available in the table after each monthly load. See the following screenshot.

Similarly, you can use the same analysis to see how a specific metric changes over time for a column. The following screenshot shows that the mean of the fare_amount column changes after each monthly load.

You can select any metric calculated on any column, which makes for a very flexible profiling data reporting system.

Conclusion

This post demonstrated how to extend the metadata contained in the Data Catalog with profiling information calculated with an Apache Spark application based on the Amazon Deequ library running on an EMR cluster.

You can query the Data Catalog using the AWS CLI. You can also build a reporting system with Athena and Amazon QuickSight to query and visualize the data stored in Amazon S3.

Special thanks go to Sebastian Schelter at Amazon Search and Sven Hansen and Vincent Gromakowski at AWS for their help and support

 


About the Author

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Deploy an Amazon EMR edge node with RStudio using AWS Systems Manager

Post Syndicated from Randy DeFauw original https://aws.amazon.com/blogs/big-data/deploy-an-amazon-emr-edge-node-with-rstudio-using-aws-systems-manager/

RStudio is an integrated development environment (IDE) for R, a language and environment for statistical computing and graphics. As a data scientist, you may integrate R and Spark (a big data processing framework) to analyze large datasets. You can use an R package called sparklyr to offload filtering and aggregation of large datasets from your R script to Spark and use R’s native strength to further analyze and visualize the results from Spark.

An R script running in RStudio uses sparklyr to submit Spark jobs to the cluster. Typically, an R script (along with sparklyr) runs in an RStudio environment that is installed on a machine that’s separate from the cluster of machines (in Amazon EMR) that runs Spark. To enable sparklyr to submit Spark jobs, you need to establish network connectivity between the RStudio machine and the cluster running Spark. One way to do that is to run RStudio on an edge node, which is a machine that is part of the cluster’s private network and runs client applications like RStudio. Edge nodes let you run client applications separately from the nodes that run the core Hadoop services. Edge nodes also offer convenient access to local Spark and Hive shells.

However, edge nodes are not easy to deploy. They must have the same versions of Hadoop, Spark, Java, and other tools as the Hadoop cluster, and require the same Hadoop configuration as nodes in the cluster.

This post demonstrates an automated way to create an edge node with RStudio installed using AWS Systems Manager.

Deploying an edge node for an EMR cluster

One method to deploy an edge node involves creating an Amazon EC2 AMI directly from the EMR master node. For more information, see Launch an edge node for Amazon EMR to run RStudio. This post offers an SSM automation document that simplifies on-demand edge node deployment. Systems Manager gives you visibility and control of your AWS infrastructure, and Systems Manager Automation lets you safely automate common and repetitive tasks, like creating edge nodes on demand.

This post walks you through the process of installing the SSM document and how to use the document to create an edge node. For more information about the code, see the GitHub repo.

Creating the automation document

First, you use Terraform to create the automation document. You can download Terraform from the Terraform website. Alternatively, AWS CloudFormation works equally well.

After you install Terraform, go to the directory where you cloned the repo and edit the file vars.tf. For more information, see the GitHub repo. This file defines several input parameters, and the comments in the file should be self-explanatory. You can provide default values in vars.tf or override using one of the other supported techniques. For more information, see Input Variables on the Terraform website.

Next, enter the following code:

tf init # one time only
tf apply

The code runs a Terraform plan to create the document. Your environment should already be configured to access your AWS account with privileges to do the following:

To make updates to the Terraform plan going forward, use Terraform’s shared state feature. For more information, see Remote State on the Terraform website.

The Terraform plan loads your automation document from a local template file and registers it with Systems Manager. See the following code:

# Load our document from a template and substitute some variables.
data "template_file" "ssm_doc_edge_node" {
  template = "${file("${path.module}/ssm_doc_edge_node.tpl")}"

  vars = {
    SSMRoleArn = "${aws_iam_role.ssm_automation_role.arn}"
    InstanceProfileArn = "${aws_iam_instance_profile.edge_node_profile.arn}"
    PlaybookUrl = "s3://${var.bucket}/init.yaml"
    Environment = "${var.environment}"
    Project = "${var.ProjectTag}"
    region = "${var.region}"
  }
}

# Register the document content with SSM
resource "aws_ssm_document" "create_edge_node" {
  name          = "create_edge_node"
  document_type = "Automation"
  document_format = "YAML"
  tags = {
    Name = "create_edge_node"
    Project = "${var.ProjectTag}"
    Environment = "${var.environment}"
  }

  content = "${data.template_file.ssm_doc_edge_node.rendered}"
}

The rest of the Terraform plan does the following:

  • Uploads an Ansible template for the SSM document to use
  • Sets up IAM roles and policies that let Systems Manager and a new edge node assume the correct privileges

What’s in the automation document?

The automation document has three main steps. First, it creates and launches a new AMI from the existing EMR master node. See the following code:

- name: create_ami
  action: aws:createImage
  maxAttempts: 1
  timeoutSeconds: 1200
  onFailure: Abort
  inputs:
    InstanceId: "{{MasterNodeId}}"
    ImageName: AMI Created on{{global:DATE_TIME}}
    NoReboot: true
    
- name: launch_ami
  action: aws:runInstances
  maxAttempts: 1
  timeoutSeconds: 1200
  onFailure: Abort
  inputs:
    ImageId: "{{create_ami.ImageId}}"
  ...

Next, it updates the SSM agent and runs an Ansible playbook to install RStudio. You can examine the Ansible playbook in GitHub; it installs RStudio and dependencies and handles some initial configuration. See the following code:

- name: updateSSMAgent
  action: aws:runCommand
  inputs:
    DocumentName: AWS-UpdateSSMAgent
    InstanceIds:
    - "{{launch_ami.iid}}"
- name: installPip
  action: aws:runCommand
  inputs:
    DocumentName: AWS-RunShellScript
    InstanceIds:
    - "{{launch_ami.iid}}"
    Parameters:
        commands: 
        - pip install ansible boto3 botocore
- name: runPlaybook
  action: aws:runCommand
  inputs:
    DocumentName: AWS-RunAnsiblePlaybook
    InstanceIds:
    - "{{launch_ami.iid}}"
    Parameters:
      playbookurl: "${PlaybookUrl}"

Finally, it adds an Amazon CloudWatch alarm to trigger EC2 instance recovery if the edge node fails. See the following code:

- name: add_recovery
  action: aws:executeAwsApi
  inputs:
    Service: cloudwatch
    Api: PutMetricAlarm
    AlarmName: "Recovery for edge node {{ launch_ami.iid }}"
    ActionsEnabled: true
    AlarmActions: 
    - "arn:aws:automate:${region}:ec2:recover"

Using the automation document

To start using the automation document, complete the following steps:

  1. On the Systems Manager console, choose Automation.
  2. Choose Execute automation.
  3. On the Owned by me tab, choose the document create_edge_node.
  4. Choose Next.

    On the next page, you need to fill in three pieces of information. You may want to get some advice from your cloud operations team, or whomever manages your EMR clusters. For instructions on creating a cluster with the latest EMR version and Spark, see Launch Your Sample Amazon EMR Cluster.
  5. In the Input parameters section, provide the following information:
    • For MasterNodeId, enter the EC2 instance ID of the master node of the EMR cluster you want to connect to.In most cases, your operations team can provide this information, but you can also find the instance ID by going to the Hardware tab of your EMR cluster and drilling into the master node group. Your EMR cluster must have Spark installed because you want to use sparklyr with RStudio.The following screenshot shows where to find your EC2 instance ID on the Hardware tab.
    • For SubnetId, enter the subnet that the edge node should live in. Your operations team should provide this information, or you can see it on the Summary tab of the EMR cluster. The edge node must live in the same VPC as the cluster. It does not need to be in a public subnet because you connect via Session Manager.The following screenshot shows where to find your subnet ID on the Summary tab of your cluster.
    • For QuickIdentifier, enter a user-friendly name to help you remember this edge node; for example, Edge Node with RStudio.

When the execution is finished, you will see the completed steps, as in the following screenshot.

If you choose the last step in the list (step 8), you see the DNS name and EC2 instance ID for your new edge node. See the following screenshot.

You can now connect to this edge node by using another feature of Systems Manager: Session Manager. Session Manager lets you open an SSH tunnel for port forwarding without having to use SSH keys or expose the SSH port to the internet. For instructions on opening a port forwarding session, see Starting a Session (Port Forwarding). You need the Session Manager plugin installed locally. See the following code:

aws ssm start-session \
  --target instance-id \ # Get this from the output of the automation document
  --document-name AWS-StartPortForwardingSession \
  --parameters '{"portNumber":["8787"], "localPortNumber":["8787"]}'

For more information, see Install the Session Manager Plugin for the AWS CLI.

You can now access RStudio at http://localhost:8787. See the following screenshot.

You can also access the node directly and use the local Hive and Spark shells through the Session Manager console.

This post sets up the SSM document to create single-user edge nodes. The default user name to log in to RStudio is ruser. You must set the password by changing the password for the ruser account directly in the operating system, because RStudio uses PAM authentication by default. For more information, see What is my username on my RStudio Server? To change the password, open another Session Manager session and enter the following code:

aws ssm start-session \
  --target instance-id \ # Get this from the output of the automation document

$ sudo passwd ruser # Enter a password of your choice and confirm it

You should keep any valuable files like R scripts in a GitHub repo and store any output data in an S3 bucket for long-term persistence.

Configuring security

The Terraform plan sets up three important IAM roles:

  • A role that Systems Manager assumes when running the automation document. This role needs to perform actions in Amazon EC2, like creating new AMIs, CloudWatch, and Systems Manager.
  • An EC2 instance profile for the edge nodes. Theprofile has the permissions necessary for the SSM agent to run and for the edge node to perform tasks typical of an EMR node.
  • A role for CloudWatch to perform instance recovery.

In your environment, you may want to review the IAM roles and policies and tighten their scope based on tags or other conditions.

Conclusion

This post described an automated way to deploy an EMR edge node with RStudio using an SSM document. EMR edge nodes with RStudio give you a familiar working environment with access to large datasets via Spark and sparklyr. For information about deploying a new edge node and installing the necessary Hadoop libraries with an AWS CloudFormation template, see Launch an edge node for Amazon EMR to run RStudio.

 


About the Author

Randy DeFauw is a principal solutions architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.

 

 

Simplify your Spark dependency management with Docker in EMR 6.0.0

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/simplify-your-spark-dependency-management-with-docker-in-emr-6-0-0/

Apache Spark is a powerful data processing engine that gives data analyst and engineering teams easy to use APIs and tools to analyze their data, but it can be challenging for teams to manage their Python and R library dependencies. Installing every dependency that a job may need before it runs and dealing with library version conflicts is time-consuming and complicated. Amazon EMR 6.0.0 simplifies this by allowing you to use Docker images from Docker Hub and Amazon ECR to package your dependencies. This allows you to package and manage your dependencies for individual Spark jobs or notebooks, without having to manage a spiderweb of dependencies across your cluster.

This post shows you how to use Docker to manage notebook dependencies with Amazon EMR 6.0.0 and EMR Notebooks. You will launch an EMR 6.0.0 cluster and use notebook-specific Docker images from Amazon ECR with your EMR Notebook.

Creating a Docker image

The first step is to create a Docker image that contains Python 3 and the latest version of the numpy Python package. You create Docker images by using a Dockerfile, which defines the packages and configuration to include in the image. Docker images used with Amazon EMR 6.0.0 must contain a Java Development Kit (JDK); the following Dockerfile uses Amazon Linux 2 and the Amazon Corretto JDK 8:

FROM amazoncorretto:8

RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development

RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

RUN python -V
RUN python3 -V

ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip
RUN pip3 install numpy 

RUN python3 -c "import numpy as np"

You will use this Dockerfile to create a Docker image, and then tag and upload it to Amazon ECR. After you upload it, you will launch an EMR 6.0.0 cluster that is configured to use this Docker image as the default image for Spark jobs. Complete the following steps to build, tag, and upload your Docker image:

  1. Create a directory and a new file named Dockerfile using the following commands:
    $ mkdir pyspark-latest
    $ vi pyspark-latest/Dockerfile

  2. Copy, and then paste the contents of the Dockerfile, save it, and run the following command to build a Docker image:
    $ docker build -t 'local/pyspark-latest' pyspark-latest/

  3. Create the emr-docker-examples Amazon ECR repository for this walkthrough using the following command:
    $ aws ecr create-repository --repository-name emr-docker-examples

  4. Tag the locally built image and replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint, using the following command:
    $ docker tag local/pyspark-latest 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest

    Before you can push the Docker image to Amazon ECR, you need to log in.

  5. To get the login line for your Amazon ECR account, use the following command:
    $ aws ecr get-login --region us-east-1 --no-include-email

  6. Enter and run the output from the get-login command:
    $ docker login -u AWS -p <password> https://123456789123.dkr.ecr.us-east-1.amazonaws.com 

  7. Upload the locally built image to Amazon ECR and replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint. See the following command:
    $ docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest

After this push is complete, the Docker image is available to use with your EMR cluster.

Launching an EMR 6.0.0 cluster with Docker enabled

To use Docker with Amazon EMR, you must launch your EMR cluster with Docker runtime support enabled and have the right configuration in place to connect to your Amazon ECR account. To allow your cluster to download images from Amazon ECR, makes sure the instance profile for your cluster has the permissions from the AmazonEC2ContainerRegistryReadOnly managed policy associated with it. The configuration in the first step below configures your EMR 6.0.0 cluster to use Amazon ECR to download Docker images, and configures Apache Livy and Apache Spark to use the pyspark-latest Docker image as the default Docker image for all Spark jobs. Complete the following steps to launch your cluster:

  1. Create a file named emr-configuration.json in the local directory with the following configuration (replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint):
    [
       {
          "Classification":"container-executor",
          "Properties":{
    
          },
          "Configurations":[
             {
                "Classification":"docker",
                "Properties":{
                   "docker.privileged-containers.registries":"local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                   "docker.trusted.registries":"local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com"
               }
             }
          ]
       },
       {
          "Classification":"livy-conf",
          "Properties":{
             "livy.spark.master":"yarn",
             "livy.spark.deploy-mode":"cluster",
             "livy.server.session.timeout":"16h"
          }
       },
       {
          "Classification":"hive-site",
          "Properties":{
             "hive.execution.mode":"container"
          }
       },
       {
          "Classification":"spark-defaults",
          "Properties":{
             "spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE":"docker",
             "spark.yarn.am.waitTime":"300s",
             "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE":"docker",
             "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG":"hdfs:///user/hadoop/config.json",
             "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE":"123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest",
             "spark.executor.instances":"2",
             "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG":"hdfs:///user/hadoop/config.json",
             "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE":"123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-latest"
          }
       }
    ]

    You will use that configuration to launch your EMR 6.0.0 cluster using the AWS CLI.

  2. Enter the following commands (replace myKey with the name of the EC2 key pair you use to access the cluster using SSH, and subnet-1234567 with the subnet ID the cluster should be launched in):
    aws emr create-cluster \
    --name 'EMR 6.0.0 with Docker' \
    --release-label emr-6.0.0 \
    --applications Name=Livy Name=Spark \
    --ec2-attributes "KeyName=myKey,SubnetId=subnet-1234567" \
    --instance-type m5.xlarge --instance-count 3 \
    --use-default-roles \
    --configurations file://./emr-configuration.json

    After the cluster launches and is in the Waiting state, make sure that the cluster hosts can authenticate themselves to Amazon ECR and download Docker images.

  3. Use your EC2 key pair to SSH into one of the core nodes of the cluster.
  4. To generate the Docker CLI command to create the credentials (valid for 12 hours) the cluster uses to download Docker images from Amazon ECR, enter the following command:
    $ aws ecr get-login --region us-east-1 --no-include-email

  5. Enter and run the output from the get-login command:
    $ sudo docker login -u AWS -p <password> https://123456789123.dkr.ecr.us-east-1.amazonaws.com 

    This command generates a config.json file in the /root/.docker folder.

  6. Place the generated config.json file in the HDFS location /user/hadoop/ using the following command:
    $ sudo hdfs dfs -put /root/.docker/config.json /user/hadoop/

Now that you have an EMR cluster that is configured with Docker and an image in Amazon ECR, you can use EMR Notebooks to create and run your notebook.

Creating an EMR Notebook

EMR Notebooks are serverless Jupyter notebooks available directly through the Amazon EMR console. They allow you to separate your notebook environment from your underlying cluster infrastructure, and access your notebook without spending time setting up SSH access or configuring your browser for port-forwarding. You can find EMR Notebooks in the left-hand navigation of the EMR console.

To create your notebook, complete the following steps:

  1. Click on Notebooks in the EMR Console
  2. Choose a name for your notebook
  3. Click Choose an existing cluster and select the cluster you just created
  4. Click Create notebook
  5. Once your notebook is in a Ready status, you can click the Open in JupyterLab button to open it in a new browser tab. A default notebook with the name of your EMR Notebook is created by default. When you click on that notebook, you’ll be asked to choose a Kernel. Choose PySpark.
  6. Enter the following configuration into the first cell in your notebook and click ▸(Run):
    %%configure -f
    {"conf": { "spark.pyspark.virtualenv.enabled": "false" }}

  7. Enter the following PySpark code into your notebook and click ▸(Run):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
    sc = spark.sparkContext
    
    import numpy as np
    a = np.arange(15).reshape(3, 5)
    print(a)
    print(np.__version__)

The output should look like the following screenshot; the numpy version in use is the latest (at the time of this writing, 1.18.2).

This PySpark code was run on your EMR 6.0.0 cluster using YARN, Docker, and the pyspark-latest image that you created. EMR Notebooks connect to EMR clusters using Apache Livy. The configuration specified in emr-configuration.json configured your EMR cluster’s Spark and Livy instances to use Docker and the pyspark-latest Docker image as the default Docker image for all Spark jobs submitted to this cluster. This allows you to use numpy without having to install it on any cluster nodes. The following section looks at how you can create and use different Docker images for specific notebooks.

Using a custom Docker image for a specific notebook

Individual workloads often require specific versions of library dependencies. To allow individual notebooks to use their own Docker images, you first create a new Docker image and push it to Amazon ECR. You then configure your notebook to use this Docker image instead of the default pyspark-latest image.

Complete the following steps:

  1. Create a new Dockerfile with a specific version of numpy: 1.17.5.
    FROM amazoncorretto:8
    
    RUN yum -y update
    RUN yum -y install yum-utils
    RUN yum -y groupinstall development
    
    RUN yum list python3*
    RUN yum -y install python3 python3-dev python3-pip python3-virtualenv
    
    RUN python -V
    RUN python3 -V
    
    ENV PYSPARK_DRIVER_PYTHON python3
    ENV PYSPARK_PYTHON python3
    
    RUN pip3 install --upgrade pip
    RUN pip3 install 'numpy==1.17.5'
    
    RUN python3 -c "import numpy as np"

  2. Create a directory and a new file named Dockerfile using the following commands:
    $ mkdir numpy-1-17
    $ vi numpy-1-17/Dockerfile

  3. Enter the contents of your new Dockerfile and the following code to build a Docker image:
    $ docker build -t 'local/numpy-1-17' numpy-1-17/

  4. Tag and upload the locally built image to Amazon ECR and replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint using the following commands:
    $ docker tag local/numpy-1-17 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17 
    $ docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17

    Now that the numpy-1-17 Docker image is available in Amazon ECR, you can use it with a new notebook.

  1. Create a new Notebook by returning to your EMR Notebook and click File, New, Notebook, and choose the PySpark kernel.To tell your EMR Notebook to use this specific Docker image instead of the default, you need to use the following configuration parameters.
  1. Enter the following code in your notebook (replace 123456789123.dkr.ecr.us-east-1.amazonaws.com with your Amazon ECR endpoint) and choose ▸(Run):
     %%configure -f
    {"conf": 
     { 
      "spark.pyspark.virtualenv.enabled": "false",
      "spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE": "123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17",
      "spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE":"123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:numpy-1-17"
     }
    }

    To check if your PySpark code is using version 1.17.5, enter the same PySpark code as before to use numpy and output the version.

  1. Enter the following code into your notebook and choose Run:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
    sc = spark.sparkContext
    
    import numpy as np
    a = np.arange(15).reshape(3, 5)
    print(a)
    print(np.__version__)

The output should look like the following screenshot; the numpy version in use is the version you installed in your numpy-1-17 Docker image: 1.17.5.

Summary

This post showed you how to simplify your Spark dependency management using Amazon EMR 6.0.0 and Docker. You created a Docker image to package your Python dependencies, created a cluster configured to use Docker, and used that Docker image with an EMR Notebook to run PySpark jobs. To find out more about using Docker images with EMR, refer to the EMR documentation on how to Run Spark Applications with Docker Using Amazon EMR 6.0.0. Stay tuned for additional updates on new features and further improvements with Apache Spark on Amazon EMR.

 


About the Author

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

Apache Hive is 2x faster with Hive LLAP on EMR 6.0.0

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/apache-hive-is-2x-faster-with-hive-llap-on-emr-6-0-0/

Customers use Apache Hive with Amazon EMR to provide SQL-based access to petabytes of data stored on Amazon S3. Amazon EMR 6.0.0 adds support for Hive LLAP, providing an average performance speedup of 2x over EMR 5.29, with up to 10x improvement on individual Hive TPC-DS queries. This post shows you how to enable Hive LLAP, and outlines the performance gains we’ve observed using queries from the TPC-DS benchmark.

Twice as fast compared to Amazon EMR 5.29.0

To evaluate the performance benefits of running Hive with Amazon EMR release 6.0.0, we’re using 70 TCP-DS queries with a 3 TB Apache Parquet dataset on a six-node c4.8xlarge EMR cluster to compare the total runtime and geometric mean with results from EMR release 5.29.0.

The results show that the TPC-DS queries run twice as fast in Amazon EMR 6.0.0 (Hive 3.1.2) compared to Amazon EMR 5.29.0 (Hive 2.3.6) with the default Amazon EMR Hive configuration.

The following graph shows performance improvements measured as total runtime for 70 TPC-DS queries. Amazon EMR 6.0.0 has the better (lower) runtime.

The following graph shows performance improvements measured as geometric mean for 70 TPC-DS queries. Amazon EMR 6.0.0 has the better (lower) geometric mean.

The following graph shows the performance improvements on a per-query basis sorted by highest performance gain. In this comparison, the higher numbers are better.

Hive Live Long and Process (LLAP)

Hive LLAP enhances the execution model of Hive, using persistent daemons with dynamic in-memory caching for low-latency queries. These daemons run on the core and task nodes in EMR clusters, caching data and metadata, and avoid the container startup overhead of traditional Hive queries because they are long lived processes.

By default, LLAP daemons do not start as part of EMR cluster start-up. You can enable LLAP in Amazon EMR 6.0.0 with the following hive configuration:

[
  {
    "Classification": "hive",
    "Properties": {
      "hive.llap.enabled": "true"
    }
  }
]

Since Hive LLAP uses persistent daemons that run on YARN, a percentage of the EMR cluster’s YARN resources will be reserved for Hive LLAP daemons when LLAP is enabled. You can override the following properties, which are predefined/calculated by EMR, using the hive configuration when launching an EMR cluster.

PropertyDescriptionDefault
hive.llap.num-instancesDefines the number of LLAP instances to run on the EMR cluster. There can be at max one LLAP instance per Node manager node (Core/Task).Number of Core/Task nodes in the cluster
hive.llap.percent-allocationDefines percentage of YARN NodeManager resources allocated to LLAP instance. This only applies to nodes running an LLAP instance defined by hive.llap.num-instances.0.6 (60%)

For example, to define 80% of YARN NodeManager resources to LLAP, use the following configuration:

 [
      {
        "classification": "hive",
        "properties": {
          "hive.llap.percent-allocation": "0.8",
          "hive.llap.enabled": "true"
        },
        "configurations": []
      }
    ]

You can override the following properties which are predefined/calculated by EMR using hive-site classification when launching an EMR cluster.

PropertyDescription
hive.llap.daemon.yarn.container.mbYARN container size for each LLAP daemon
hive.llap.daemon.memory.per.instance.mbLLAP Xmx
hive.llap.io.memory.sizeSize of LLAP IO cache in an LLAP daemon
hive.llap.daemon.num.executorsNumber of executors (tasks that can execute in parallel) per LLAP daemon

For more details on configuring Hive LLAP in Amazon EMR 6.0.0, please refer to Using Hive LLAP.

Resizing LLAP Daemons

You can modify the number of LLAP instances using the YARN CLI. Here’s a sample command:

$ yarn app -flex <Application Name> -component llap -1

  • Application Name – llap0 (Default)

You can check the status of the Hive LLAP daemons with the following command:

$ hive --service llapstatus

Hive LLAP Web Services

The Hive LLAP Monitor listens on port 15002 in each core and task node running the LLAP daemon.

The following table provides an overview of the Web Services available in LLAP.

URIDescription
http://coretask-public-dns-name:15002Shows the overview of heap, cache, executor and system metrics
http://coretask-public-dns-name:15002/confShows the current LLAP configuration
http://coretask-public-dns-name:15002/peersShows the details of LLAP nodes in the cluster extracted from the Zookeeper server
http://coretask-public-dns-name:15002/iomemShows details about the cache contents and usage
http://coretask-public-dns-name:15002/jmxShows the LLAP daemon’s JVM metrics
http://coretask-public-dns-name:15002/stacksShows JVM stack traces of all threads
http://coretask-public-dns-name:15002/conflogShows the current log levels
http://coretask-public-dns-name:15002/statusShows the status of LLAP

 Hive Performance (LLAP vs Container)

In EMR 6.0.0, Hive LLAP is optional and all Hive queries are executed using dynamically allocated containers when Hive LLAP is disabled. To illustrate the differences of running Hive queries with persistent Hive LLAP daemons versus dynamically allocated containers, we’ve used a subset of the TCP-DS benchmark. The results show an overall performance improvement of 27%, with some queries sped up by up to 4x.

The following graph shows performance improvements measured as total runtime for 50 TPC-DS queries. Amazon EMR 6.0.0 using LLAP has the better (lower) runtime.

The following graph shows performance improvements measured as geometric mean for 50 TPC-DS queries. Amazon EMR 6.0.0 using LLAP has the better (lower) runtime.

The following graph shows the performance improvements on a per-query basis sorted by highest performance gain. In this comparison, the higher numbers are better.

Summary

This post demonstrated the performance improvement of Hive on Amazon EMR 6.0.0 in comparison to the previous Amazon EMR 5.29 release. This improvement in performance helps to reduce query runtime and cost. You also learned about to use Hive LLAP with Amazon EMR 6.0.0, how to configure it, how to view the status and metrics using LLAP Monitor, and saw the performance gains when Hive LLAP is enabled. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.

 


About the Author

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

Tune Hadoop and Spark performance with Dr. Elephant and Sparklens on Amazon EMR

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/tune-hadoop-and-spark-performance-with-dr-elephant-and-sparklens-on-amazon-emr/

Data engineers and ETL developers often spend a significant amount of time running and tuning Apache Spark jobs with different parameters to evaluate performance, which can be challenging and time-consuming. Dr. Elephant and Sparklens help you tune your Spark and Hive applications by monitoring your workloads and providing suggested changes to optimize performance parameters, like required Executor nodes, Core nodes, Driver Memory and Hive (Tez or MapReduce) jobs on Mapper, Reducer, Memory, Data Skew configurations. Dr. Elephant gathers job metrics, runs analysis on them, and presents optimization recommendations in a simple way for easy consumption and corrective actions. Similarly, Sparklens makes it easy to understand the scalability limits of Spark applications and compute resources, and runs efficiently with well-defined methods instead of leaning by trial and error, which saves both developer and compute time.

This post demonstrates how to install Dr. Elephant and Sparklens on an Amazon EMR cluster and run workloads to demonstrate these tools’ capabilities. Amazon EMR is a managed Hadoop service offered by AWS to easily and cost-effectively run Hadoop and other open-source frameworks on AWS.

The following diagram illustrates the solution architecture. Data engineers and ETL developers can submit jobs to Amazon EMR cluster and based on Dr. Elephant and Sparklens tools recommendations , they can optimize their Spark applications and compute resources for better performance and efficiency.

Prerequisite Steps

Creating a new EMR cluster

To configure an EMR cluster with Dr. Elephant or Sparklens, launch an EMR cluster with your desired capacity. This post uses the 10 core nodes of r4.xlarge instances and one master node of r4.4xlarge with the default settings.

You can launch the cluster via the AWS Management Console, an AWS CloudFormation template, or AWS CLI commands. Use the following CloudFormation Stack:

The Following screenshot shows EMR cluster Summary launched from CloudFormation stack.

Enabling Dr. Elephant or Sparklens

If you already have a persistent cluster running, add these steps to enable Dr. Elephant or Sparklens. See the following code:

aws emr add-steps --cluster-id j-XXXXXXXX --steps '[{"Args":["s3://aws-bigdata-blog/artifacts/aws-blog-hadoop-spark-performance-tuning/install-dr-elephant-emr5.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":" s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Install Dr. Elephant and Sparklens"}

Validating access to the Dr. Elephant portal and Sparklens settings

To access Dr. Elephant, direct your browser to the master nodes address on port 8087:

https://<< DNS Name.compute-1.amazonaws.com>>:8087

Note: You need to set up an SSH tunnel to the master node using dynamic or local port forwarding.

The following screenshot shows Dr. Elephant dashboard listed with latest analysis of jobs submitted in EMR cluster.

To validate Sparklens, you need to SSH to the master node. For more information, see Connect to the Master Node Using SSH.

On the console, run the following command:

cd /etc/spark/conf/

Launch PySpark and check that settings are enabled. See the following code:

[[email protected]]$ pyspark

You should see the line qubole#sparklens added as a dependency in the code. The following screenshot shows Sparklens enabled on EMR cluster:

Sparklens – Testing Spark workloads

You can now test Spark workloads in an EMR cluster and observe them through the Sparklens logs.

This post tests a data generator example of 100 billion records using PySpark code and observes how Sparklens helps to optimize and provide recommendations to fine-tune the processing. Complete the following steps:

  1. Copy the code in the EMR cluster.
  2. Navigate to the /home/hadoop/
  3. Enter the following code via test-spark.py:
    from pyspark.sql.functions import rand, randn
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext, SQLContext
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)
    
    df = sqlContext.range(0, 100000000000)
    df.count()
    df2 = df.select("id", rand(seed=1000).alias("uniform"), randn(seed=555).alias("normal"))
    row1 = df2.agg({"id": "max"}).collect()[0]
    print row1["max(id)"]
    df2.createOrReplaceTempView("df2")
    df_part1 = spark.sql("select * from df2 where id between 1 and 999999 order by id desc")
    row2 = df_part1.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part1.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output1/")
    df_part2 = spark.sql("select * from df2 where id > 10000000 order by id desc")
    row2 = df_part2.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part2.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output2/")

  4. Run spark-submit test-spark.py.

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens collected application tasks metrics:

 

The following screenshot shows duration of jobs timeline metrics:

The following screenshot shows Sparklens recommendation on minimum possible time execution for the application jobs submitted:

Sparklens suggests the minimum possible runtime for the app resource is 23 seconds, compared to the default settings with an execution time of 50 seconds. Compute hours wasted are 76.15%, and only 30.98% of compute hours used.

You can reduce the executor count and executor core in the spark-submit job and see how it changes the outcome.

Enter the following code:

spark-submit --num-executors 1 --executor-cores 1 test-spark.py

The following screenshot shows Sparklens job application metrics after tuning the job:

The job completion time is reduced to 45 seconds, and only one executor node and one core is sufficient to run the job. It helps identify specific stages (like driver, skew, or lack of tasks) that are limiting Spark application and provides contextual information about what could be going wrong with these stages.

This post tests the preceding estimating Pi example with three natively supported applications—Scala, Java, and Python. For more information, see Write a Spark Application. To run the test, complete the following steps:

  1. Enter the following code:
    import sys
    from random import random
    from operator import add
    
    from pyspark import SparkContext
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        sc = SparkContext(appName="PythonPi")
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_):
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 < 1 else 0
    
        count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
        print "Pi is roughly %f" % (4.0 * count / n)
    
        sc.stop()

  2. Copy the Spark code example into a local directory.
  3. Run the code as a spark-submit See the following code:spark-submit test-spark2.py

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens collected application tasks metrics:

The following screenshot shows Sparklens recommendation on minimum possible time execution for the application jobs submitted:

The following screenshot shows Sparklens metrics on cluster compute utilization:

The following screenshot shows Sparklens recommendation on cluster utilization with core compute nodes and estimated utilization:

In this test, Sparklens suggests recommendations and minimum possible runtime for the app resource is 10 seconds, compared to the default settings with an execution time of 14 seconds. Compute hours wasted are 87.13% and only 12.87% of compute hours used.

In a single run of a Spark application, Sparklens can estimate how your application performs given any arbitrary number of executors. This helps you understand the ROI of adding executors. Internally, Sparklens has the concept of an analyzer, which is a generic component for emitting interesting events. For more information about analyzers, see the GitHub repo.

You can reduce the executor count and executor core in a spark-submit job and see how it changes the outcome. See the following code:

spark-submit --num-executors 2 --executor-cores 2 test-spark2.py

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens job application metrics after tuning the jobs:

The job completion time is now reduced to 12 seconds and the job needed only one executor node and core sufficient to process the job.

Dr. Elephant Testing the Hive/MapReduce workloads

You can test scenarios in an EMR cluster and observe them through the Dr. Elephant portal.

Testing Hive load and performance analysis

You can load example datasets through the Hive CLI console and see how the workload performs and what performance optimizations it suggests.

This post demonstrates how to analyze Elastic Load Balancer access logs stored in Amazon S3 using Hive. Complete the following steps:

  1. On the Hive CLI, enter the following code:
    CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs (
    Ts STRING,
    ELBName STRING,
    RequestIP STRING,
    RequestPort INT,
    BackendIP STRING,
    BackendPort INT,
    RequestProcessingTime DOUBLE,
    BackendProcessingTime DOUBLE,
    ClientResponseTime DOUBLE,
    ELBResponseCode STRING,
    BackendResponseCode STRING,
    ReceivedBytes BIGINT,
    SentBytes BIGINT,
    RequestVerb STRING,
    URL STRING,
    Protocol STRING
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
    "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\"$"
    ) LOCATION 's3://us-east-1.elasticmapreduce.samples/elb-access-logs/data/';

    The following screenshot shows external table creation through hive:

  2. Run a simple count query and check Dr. Elephant’s recommendations. See the following code:
    SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP;

  3. Launch the Dr. Elephant Portal. The following screenshot shows the Dr. Elephant output. The Hive job needs some tuning for Tez mapper memory.
  4. Click on Tez mapper memory section from the application metrics highlighted section.The following screenshot shows that Dr. Elephant recommendation on over-allocated Tez mapper memory:
  5. Run the Hive query with reduced mapper memory. See the following code:
    set hive.tez.container.size=1024;SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP

The following screenshot shows application metrics for the submitted jobs:

The following screenshot shows that Dr. Elephant indicates there are no more errors/warning from the query:

Tuning a map reduce job

To tune a map reduce job, run the following Pi code example with the default settings:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 200 1000000

The following screenshot shows application metrics for the submitted jobs:

The following screenshot of Dr. Elephant shows that the mapper time, map memory, and reducer memory need tuning:

The following screenshot shows Dr. Elephant recommendation on improving Mapper Memory:

The following screenshot shows Dr. Elephant recommendation on improving Reducer Memory:

Now, you can set the memory for Mapper and Reducer to the following value:

set mapreduce.map.memory.mb=4096

set mapreduce.reduce.memory.mb=4096

You can reduce the number of mappers and increase the number of samples per mapper to get the same Pi results. See the following code:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 100 2000000

The following screenshot shows improved metrics from recommendation:

The job shows a 50% improvement in efficiency—from approximately 60 seconds to 38 seconds.

You can run with 10 mappers and observe even more improved efficiency. See the following code:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 10 2000000

The following screenshot shows improved metrics from Dr. Elephant recommendation:

Dr. Elephant on an Amazon EMR cluster monitored and provided insights to optimize your Hive and Hadoop jobs.

Configuring your production workloads

To fine-tune the Dr. Elephant tool, navigate to the /mnt/dr-elephant-2.1.7/app-conf directory and edit the configuration files accordingly.

For example, you can write heuristics and plug them into the Dr. Elephant tool to set certain conditions to change in severity based on the number of tasks, and map or reduce numbers deviations from cluster capacity. You can also change the number of threads to analyze the completed jobs, or intervals between fetches from the resource manager.

The following screenshot shows list of Dr. Elephant configuration file for further tuning and customization according to requirements:

For more information about the metrics and configurations, see the GitHub repo.

Conclusion

This post showed how you can launch Dr. Elephant and Sparklens tools on an Amazon EMR cluster and try yourselves on optimizing and performance tuning for both compute and memory-intensive jobs. Dr. Elephant and Sparklens can help you optimize and enable faster job execution times and efficient memory management by using the parallelism of the dataset and optimal compute node usage. It also helps you overcome the challenges of processing many Spark and Hive jobs by adjusting the parallelism of the workload and cluster to meet your demands.

 


About the Author

Nivas Shankar is a Senior Data Architect at Amazon Web Services. He helps and works closely with enterprise customers building data lakes and analytical applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts.

 

 

Mert Hocanin is a big data architect with AWS, covering several products, including EMR, Athena and Managed Blockchain. Prior to working in AWS, he has worked on Amazon.com’s retail business as a Senior Software Development Engineer, building a data lake to process vast amounts of data from all over the company for reporting purposes. When not building and designing data lakes, Mert enjoys traveling and food.

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

How Verizon Media Group migrated from on-premises Apache Hadoop and Spark to Amazon EMR

Post Syndicated from Lev Brailovskiy original https://aws.amazon.com/blogs/big-data/how-verizon-media-group-migrated-from-on-premises-apache-hadoop-and-spark-to-amazon-emr/

This is a guest post by Verizon Media Group.

At Verizon Media Group (VMG), one of the major problems we faced was the inability to scale out computing capacity in a required amount of time—hardware acquisitions often took months to complete. Scaling and upgrading hardware to accommodate workload changes was not economically viable, and upgrading redundant management software required significant downtimes and carried a large amount of risk.

At VMG, we depend on technologies such as Apache Hadoop and Apache Spark to run our data processing pipelines. We previously managed our clusters with Cloudera Manager, which was subject to slow release cycles. As a result, we ran older versions of available open-source releases and couldn’t take advantage of the latest bug fixes and performance improvements on Apache projects. These reasons, combined with our already existing investment in AWS, made us explore migrating our distributed computing pipelines to Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark.

This post discusses the issues we encountered and solved while building a pipeline to address our data processing needs.

About us

Verizon Media is, ultimately, an online advertising company. Most online advertising today is done through display ads, also known as banners or video ads. Regardless of format, all internet ads usually fire various kinds of beacons to tracking servers, which are usually highly scalable web server deployments with a sole responsibility to log received beacons to one or multiple event sinks.

Pipeline architecture

In our group, which deals mostly with video advertising, we use NGINX web servers deployed in multiple geographical locations, which log events fired from our video player directly to Apache Kafka for real-time processing and to Amazon S3 for batch processing. A typical data pipeline in our group involves processing such input feeds, applying validation and enrichment routines, aggregating resulting data, and replicating it to further destinations for reporting purposes. The following diagram shows a typical pipeline that we created.

We start getting data on our NGINX beacon servers. The data is stored in 1-minute intervals on local disk in gzip files. Every minute, we move the data from NGINX servers to raw data location in S3. Upon landing on S3, the file sends a message to Amazon SQS. Apache NiFi is listening to SQS messages to start working on files. During this time, NiFi groups smaller files into larger files and stores the outcome in a special path on a temporary location on S3. The path name is combined using an inverse timestamp to make sure we store data in a random location to avoid reading bottlenecks.

Every hour, we scale out a Spark cluster on Amazon EMR to process the raw data. This processing includes enriching and validating the data. This data is stored in a permanent location folder on S3 in an Apache ORC columnar format. We also update the AWS Glue Data Catalog to expose this data in Amazon Athena in case we need to investigate it for issues. After raw data processing is finished, we downscale the Spark EMR cluster and start aggregating data based on pre-defined aggregation templates using Presto on Amazon EMR. The aggregated data is stored in ORC format in a special location on S3 for aggregated data.

We also update our Data Catalog with the location of the data so we can query it with Athena. Additionally, we replicate the data from S3 into Vertica for our reporting to expose the data to internal and external customers. In this scenario, we use Athena as the disaster recovery (DR) solution for Vertica. Every time our reporting platform sees that Vertica is in bad health, we automatically fail over to Amazon Athena. This solution proved to be extremely cost-effective for us. We have another use case for Athena in our real-time analytics that we do not discuss in this post.

Migration challenges

Migration to Amazon EMR required us to make some design changes to get the best results. When running big data pipelines on the cloud, operational cost optimization is the name of the game. The two major costs are storage and compute. In traditional on-premises Hadoop warehouses, these are coupled as storage nodes that also serve as computation nodes. This can provide a performance advantage due to data locality. However, the disadvantage of this coupling is that any changes to the storage layer, such as maintenance, can also affect the computational layer. In an environment such as AWS, we can decouple storage and computation by using S3 for storage and Amazon EMR for computation. This provides a major flexibility advantage when dealing with cluster maintenance because all clusters are ephemeral.

To further save costs, we had to figure out how to achieve maximum utilization on our computational layer. This meant that we had to switch our platform to using multiple clusters for different pipelines, where each cluster is automatically scaled depending on the pipeline’s needs.

Switching to S3

Running a Hadoop data warehouse on S3 introduces additional considerations. S3 is not a file system like HDFS and does not provide the same immediate consistency guarantees. You can consider S3 as an eventually consistent object store with a REST API to access it.

The rename

A key difference with S3 is that rename is not an atomic operation. All rename operations on S3 run a copy followed by a delete operation. Executing renames on S3 is undesirable due to running time costs. To use S3 efficiently, you must remove the use of any rename operations. Renames are commonly used in Hadoop warehouses at various commit stages, such as moving a temporary directory to its final destination as an atomic operation. The best approach is to avoid any rename operations and instead write data once.

Output committers

Both Spark and Apache MapReduce jobs have commit stages that commit output files produced by multiple distributed workers to final output directories. Explaining how output committers work is beyond the scope of this post, but the important thing is that standard default output committers designed to work on HDFS depend on rename operations, which as explained previously have a performance penalty on storage systems like S3. A simple strategy that worked for us was disabling speculative execution and switching the output committer’s algorithm version. It is also possible to write your own custom committers, which do not depend on renames. For example, as of Amazon EMR 5.19.0, AWS released a custom OutputCommitter for Spark that optimizes writes to S3.

Eventual consistency

One of the major challenges working with S3 is that it is eventually consistent, whereas HDFS is strongly consistent. S3 does offer read-after-write guarantees for PUTS of new objects, but this is not always enough to build consistent distributed pipelines on. One common scenario that comes up a lot in big data processing is one job outputting a list of files to a directory and another job reading from that directory. For the second job to run, it has to list the directory to find all the files it has to read. In S3, there are no directories; we simply list files with the same prefix, which means you might not see all the new files immediately after your first job is finished running.

To address this issue, AWS offers EMRFS, which is a consistency layer added on top of S3 to make it behave like a consistent file system. EMRFS uses Amazon DynamoDB and keeps metadata about every file on S3. In simple terms, with EMRFS enabled when listing an S3 prefix, the actual S3 response is compared to metadata on DynamoDB. If there’s a mismatch, the S3 driver polls a little longer and waits for data to show up on S3.

In general, we found that EMRFS was necessary to ensure data consistency. For some of our data pipelines, we use PrestoDB to aggregate data that is stored on S3, where we chose to run PrestoDB without EMRFS support. While this has exposed us to the eventual consistency risk for our upstream jobs, we found that we can work around these issues by monitoring for discrepancies between downstream and upstream data and rerunning the upstream jobs if needed. In our experience, consistency issues happen very rarely, but they are possible. If you choose to run without EMRFS, you should design your system accordingly.

Automatic scaling strategies

An important and yet in some ways trivial challenge was figuring out how to take advantage of Amazon EMR automatic scaling capabilities. To achieve optimal operational costs, we want to make sure no server is sitting idle.

To achieve that, the answer might seem obvious—create a long-running EMR cluster and use readily available automatic scaling features to control a cluster’s size based on a parameter, such as available free memory on the cluster. However, some of our batch pipelines start every hour, run for exactly 20 minutes, and are computationally very intensive. Because processing time is very important, we want to make sure we don’t waste any time. The optimal strategy for us is to preemptively resize the cluster through custom scripts before particular big batch pipelines start.

Additionally, it would be difficult to run multiple data pipelines on a single cluster and attempt to keep it at optimal capacity at any given moment because every pipeline is slightly different. We have instead opted to run all our major pipelines on independent EMR clusters. This has a lot of advantages and only a minor disadvantage. The advantages are that each cluster can be resized at exactly the required time, run the software version required by its pipeline, and be managed without affecting other pipelines. The minor disadvantage is that there’s a small amount of computational waste by running extra name nodes and task nodes.

When developing an automatic scaling strategy, we first tried to create and drop clusters every time we need to run our pipelines. However, we quickly found that bootstrapping a cluster from scratch can take more time than we’d like. We instead keep these clusters always running, and we upsize the cluster by adding task nodes before the pipeline starts and remove the task nodes as soon as the pipeline ends. We found that by simply adding task nodes, we can start running our pipelines much faster. If we run into issues with long-running clusters, we can quickly recycle and create a new one from scratch. We continue to work with AWS on these issues.

Our custom automatic scaling scripts are simple Python scripts, which usually run before a pipeline starts. For example, assume that our pipeline consists of a simple MapReduce job with a single mapping and reduce phase. Also assume that the mapping phase is more computationally expensive. We can write a simple script that looks at the amount of data that needs to be processed the next hour and figures out the amount of mappers that are needed to process this data in the same way that a Hadoop job does. When we know the amount of mapping tasks, we can decide how many servers we want to run all the mapper tasks in parallel.

When running Spark real-time pipelines, things are a little trickier because we sometimes have to remove computational resources while the application is running. A simple strategy that worked for us is to create a separate real-time cluster in parallel to the existing one, scale it up to a required size based on amount of data processed during the last hour with some extra capacity, and restart the real-time application on the new cluster.

Operational costs

You can evaluate all AWS costs up front with the EC2 calculator. The main costs when running big data pipelines are storage and computation, with some extra minor costs such as DynamoDB when using EMRFS.

Storage costs

The first cost to consider is storage. Because HDFS has a default replication factor of 3, it would require 3 PB of actual storage capacity instead of 1 PB.

Storing 1 GB on S3 costs ±$0.023 per month. S3 is already highly redundant so you don’t need to take the replication factor into account, which reduces our costs immediately by 67%. You should also consider the other costs for write or read requests, but these usually tend to be small.

Computation costs

The second-largest cost after storage is the cost of computation. To reduce computation costs, you should take advantage of reserved instance pricing as much as possible. An m4.4xlarge instance type with 16 VCPUs on AWS costs $0.301 an hour when it is reserved for 3 years, with all fees up-front. An On-Demand Instance costs $0.8 an hour, which is a 62% difference in price. This is easier to achieve in larger organizations that perform regular capacity planning. An extra hourly fee of $0.24 is added to every Amazon EMR machine for the use of the Amazon EMR platform. It is possible to reduce costs even further by using Amazon EC2 Spot Instances. For more information, see Instance Purchasing Options.

To achieve optimal operational costs, try to make sure that your computation clusters are never sitting idle and try to downscale dynamically based on the amount of work your clusters are doing at any given moment.

Final thoughts

We have been operating our big data pipelines on Amazon EMR for over a year and storing all our data on S3. At times, our real-time processing pipelines have peaked at handling more than 2 million events per second, with a total processing latency from the initial event to updated aggregates of 1 minute. We’ve been enjoying the flexibility around Amazon EMR and its ability to tear down and recreate clusters in a matter of minutes. We are satisfied with the overall stability of the Amazon EMR platform and we will continue working with AWS to improve it.

As we have mentioned before, cost is a major factor to consider, and you could argue that it could be cheaper to run Hadoop in your own data centers. However, this argument hinges on your organization’s ability to do so efficiently; it may have hidden operational costs as well as reduce elasticity. We know through first-hand experience that running on-premises is not an undertaking that you should take lightly and requires a lot of planning and maintenance. We believe that platforms such as Amazon EMR bring a lot of advantages when designing big data systems.

Disclaimer: The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the authors

Lev Brailovskiy is Director of Engineering leading Service Engineering Group in Supply Side Platform (SSP) at Verizon Media. He has over 15 years of experience designing and building software systems. In the past six years, Lev spent time designing, developing, and running large-scale reporting and data processing software both in private Data Centers and in the public Cloud. He can be be contacted via LinkedIn.

 

 

Zilvinas Shaltys is Technical Lead for the Video Syndication cloud data warehouse platform at Verizon. Zilvinas has years of experience working with a wide variety of big data technologies deployed at considerable scale. He was responsible for migrating big data pipelines from AOL data centers to Amazon EMR. Zilvinas is currently working on improving stability and scalability of existing batch and realtime big data systems. He can be contacted via LinkedIn.

 

Provisioning the Intuit Data Lake with Amazon EMR, Amazon SageMaker, and AWS Service Catalog

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/provisioning-the-intuit-data-lake-with-amazon-emr-amazon-sagemaker-and-aws-service-catalog/

This post shares Intuit’s learnings and recommendations for running a data lake on AWS. The Intuit Data Lake is built and operated by numerous teams in Intuit Data Platform. Thanks to Tristan Baker (Chief Architect), Neil Lamka (Principal Product Manager), Achal Kumar (Development Manager), Nicholas Audo, and Jimmy Armitage for their feedback and support.

A data lake is a centralized repository for storing structured and unstructured data at any scale. At Intuit, creating such a pile of raw data is easy. However, more interesting challenges present themselves:

  1. How should AWS accounts be organized?
  2. What ingestion methods will be used? How will analysts find the data they need?
  3. Where should data be stored? How should access be managed?
  4. What security measures are needed to protect Intuit’s sensitive data?
  5. Which parts of this ecosystem can be automated?

This post outlines the approach taken by Intuit, though it is important to remember that there are many ways to build a data lake (for example, AWS Lake Formation).

We’ll cover the technologies and processes involved in creating the Intuit Data Lake at a high level, including the overall structure and the automation used in provisioning accounts and resources. Watch this space in the future for more detailed blog posts on specific aspects of the system, from the other teams and engineers who worked together to build the Intuit Data Lake.

Architecture

Account Structure

Data lakes typically follow a hub-and-spoke model, with the hub account containing shared services that control access to data sources. For the purposes of this post, we’ll refer to the hub account as Central Data Lake.

In this pattern, access to Central Data Lake is apportioned to spoke accounts called Processing Accounts. This model maintains separation between end users and allows for division of billing among distinct business units.

 

 

It is common to maintain two ecosystems: pre-production (Pre-Prod) and production (Prod). This allows data lake administrators to silo access to data by preventing connectivity between Pre-Prod and Prod.

To enable experimentation and testing, it may also be advisable to maintain separate VPC-based environments within Pre-Prod accounts, such as dev, qa, and e2e. Processing Account VPCs would then be connected to the corresponding VPC in Central Data Lake.

Note that at first, we connected accounts via VPC Peering. However, as we scaled we quickly approached the hard limit of 125 VPC peering connections, requiring us to migrate to AWS Transit Gateway. As of this writing, we connect multiple new Processing Accounts weekly.

 

 

Central Data Lake

There may be numerous services running in a hub account, but we’ll focus on the aspects that are most relevant to this blog: ingestion, sanitization, storage, and a data catalog.

 

 

Ingestion, Sanitization, and Storage

A key component to Central Data Lake is a uniform ingestion pattern for streaming data. One example is an Apache Kafka cluster running on Amazon EC2. (You can read about how Intuit engineers do this in another AWS blog.) As we deal with hundreds of data sources, we’ve enabled access to ingestion mechanisms via AWS PrivateLink.

Note: Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an alternative for running Apache Kafka on Amazon EC2, but was not available at the start of Intuit’s migration.

In addition to stream processing, another method of ingestion is batch processing, such as jobs running on Amazon EMR. After data is ingested by one of these methods, it can be stored in Amazon S3 for further processing and analysis.

Intuit deals with a large volume of customer data, and each field is carefully considered and classified with a sensitivity level. All sensitive data that enters the lake is encrypted at the source. The ingestion systems retrieve the encrypted data and move it into the lake. Before it is written to S3, the data is sanitized by a proprietary RESTful service. Analysts and engineers operating within the data lake consume this masked data.

Data Catalog

A data catalog is a common way to give end users information about the data and where it lives. One example is a Hive Metastore backed by Amazon Aurora. Another alternative is the AWS Glue Data Catalog.

Processing Accounts

When Processing Accounts are delivered to end users, they include an identical set of resources. We’ll discuss the automation of Processing Accounts below, but the primary components are as follows:

 

 

                           Processing Account structure upon delivery to the customer

 

Data Storage Mechanisms

One reasonable question is whether all data should reside in Central Data Lake, or if it’s acceptable to distribute data across multiple accounts. A data lake might employ a combination of the two approaches, and classify data locations as primary or secondary.

The primary location for data is Central Data Lake, and it arrives there via the ingestion pipelines discussed previously. Processing Accounts can read from the primary source, either directly from the ingestion pipelines or from S3. Processing Accounts can contribute their transformed data back into Central Data Lake (primary), or store it in their own accounts (secondary). The proper storage location depends on the type of data, and who needs to consume it.

One rule worth enforcing is that no cross-account writes should be permitted. In other words, the IAM principal (in most cases, an IAM role assumed by EC2 via an instance profile) must be in the same account as the destination S3 bucket. This is because cross-account delegation is not supported—specifically, S3 bucket policies in Central Data Lake cannot grant Processing Account A access to objects written by a role in Processing Account B.

Another possibility is for EMR to assume different IAM roles via a custom credentials provider (see this AWS blog), but we chose not to go down this path at Intuit because it would have required many EMR jobs to be rewritten.

 

 

Data Access Patterns

The majority of end users are interested in the data that resides in S3. In Central Data Lake and some Processing Accounts, there may be a set of read-only S3 buckets: any account in the data lake ecosystem can read data from this type of bucket.

To facilitate management of S3 access for read-only buckets, we built a mechanism to control S3 bucket policies, administered entirely via code. Our deployment pipelines use account metadata to dynamically generate the correct S3 bucket policy based on the type of account (Pre-Prod or Prod). These policies are committed back into our code repository for auditability and ease of management.

We employ the same method for managing KMS key policies, as we use KMS with customer managed customer master keys (CMKs) for at-rest encryption in S3.

Here’s an example of a generated S3 bucket policy for a read-only bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ProcessingAccountReadOnly",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::111111111111:root",
                    "arn:aws:iam::222222222222:root",
                    "arn:aws:iam::333333333333:root",
                    "arn:aws:iam::444444444444:root",
                    "arn:aws:iam::555555555555:root",
                    ...
                    ...
                    ...
                    "arn:aws:iam::999999999999:root",
                ]
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::intuit-data-lake-example/*",
                "arn:aws:s3:::intuit-data-lake-example"
            ]
        }
    ]
}

Note that we grant access at the account level, rather than using explicit IAM principal ARNs. Because the reads are cross-account, permissions are also required on the IAM principals in Processing Accounts. Maintaining these policies—with automation, at that level of granularity—is untenable at scale. Furthermore, using specific IAM principal ARNs would create an external dependency on foreign accounts. For example, if a Processing Account deletes an IAM role that is referenced in an S3 bucket policy in Central Data Lake, the bucket policy can no longer be saved, causing interruptions to deployment pipelines.

Security

Security is mission critical for any data lake. We’ll mention a subset of the controls we use, but not dive deep.

Encryption

Encryption can be enforced both in transit and at rest, using multiple methods:

  1. Traffic within the lake should use the latest version of TLS (1.2 as of this writing)
  2. Data can be encrypted with application-level (client-side) encryption
  3. KMS keys can used for at-rest encryption of S3, EBS, and RDS

Ingress and Egress

There’s nothing out of the ordinary in our approach to ingress and egress, but it’s worth mentioning the standard patterns we’ve found important:

Policies restricting ingress and egress are the primary points at which a data lake can guarantee quality (ingress) and prevent loss (egress).

Authorization

Access to the Intuit Data Lake is controlled via IAM roles, meaning no IAM users (with long-term credentials) are created. End users are granted access via an internal service that manages role-based, federated access to AWS accounts. Regular reviews are conducted to remove nonessential users.

Configuration Management

We use an internal fork of Cloud Custodian, which is a suite of preventative, detective, and responsive controls consisting of Amazon CloudWatch Events and AWS Config rules. Some of the violations it reports and (optionally) mitigates include:

  • Unauthorized CIDRs in inbound security group rules
  • Public S3 bucket policies and ACLs
  • IAM user console access
  • Unencrypted S3 buckets, EBS volumes, and RDS instances

Lastly, Amazon GuardDuty is enabled in all Intuit Data Lake accounts and is monitored by Intuit Security.

Automation

If there is one thing we’ve learned building the Intuit Data Lake, it is to automate everything.

There are four areas of automation we’ll discuss in this blog:

  1. Creation of Processing Accounts
  2. Processing Account Orchestration Pipeline
  3. Processing Account Terraform Pipeline
  4. EMR and SageMaker deployment via Service Catalog

Creation of Processing Accounts

The first step in creating a Processing Account is to make a request through an internal tool. This triggers automation that provisions an Intuit-stamped AWS account under the correct business unit.

 

Note: AWS Control Tower’s Account Factory was not available at the start of our journey, but it can be leveraged to provision new AWS accounts in a secured, best practice, self-service way.

Account setup also includes automated VPC creation (with optional VPN), fully automated using Service Catalog. End users simply specify subnet sizes.

It’s worth noting that Intuit leverages Service Catalog for self-service deployment of other common patterns, including ingress security groups, VPC endpoints, and VPC peering. Here’s an example portfolio:

Processing Account Orchestration Pipeline

After account creation and VPC provisioning, the Processing Account Orchestration Pipeline runs. This pipeline executes one-time tasks required for Processing Accounts. These tasks include:

  • Bootstrapping an IAM role for use in further configuration management
  • Creation of KMS keys for S3, EBS, and RDS encryption
  • Creation of variable files for the new account
  • Updating the master configuration file with account metadata
  • Generation of scripts to orchestrate the Terraform pipeline discussed below
  • Sharing Transit Gateways via Resource Access Manager

Processing Account Terraform Pipeline

This pipeline manages the lifecycle of dynamic, frequently-updated resources, including IAM roles, S3 buckets and bucket policies, KMS key policies, security groups, NACLs, and bastion hosts.

There is one pipeline for every Processing Account, and each pipeline deploys a series of layers into the account, using a set of parameterized deployment jobs. A layer is a logical grouping of Terraform modules and AWS resources, providing a way to shrink Terraform state files and reduce blast radius if redeployment of specific resources is required.

EMR and SageMaker Deployment via Service Catalog

AWS Service Catalog facilitates the provisioning of Amazon EMR and Amazon SageMaker, allowing end users to launch EMR clusters and SageMaker notebook instances that work out of the box, with embedded security.

Service Catalog allows data scientists and data engineers to launch EMR clusters in a self-service fashion with user-friendly parameters, and provides them with the following:

  • Bootstrap action to enable connectivity to services in Central Data Lake
  • EC2 instance profile to control S3, KMS, and other granular permissions
  • Security configuration that enables at-rest and in-transit encryption
  • Configuration classifications for optimal EMR performance
  • Encrypted AMI with monitoring and logging enabled
  • Custom Kerberos connection to LDAP

For SageMaker, we use Service Catalog to launch notebook instances with custom lifecycle configurations that set up connections or initialize the following: Hive Metastore, Kerberos, security, Splunk logging, and OpenDNS. You can read more about lifecycle configurations in this AWS blog. Launching a SageMaker notebook instance with best-practice configuration is as easy as follows:

 

 

Conclusion

This post illustrates the building blocks we used in creating the Intuit Data Lake. Our solution isn’t wholly unique, but comprised of common-sense approaches we’ve gleaned from dozens of engineers across Intuit, representing decades of experience. These practices have enabled us to push petabytes of data into the lake, and serve hundreds of Processing Accounts with varying needs. We are still building, but we hope our story helps you in your data lake journey.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Ben Covi is a staff software engineer at Intuit. At any given moment, he’s probably losing a game of Catan.

 

 

 

New – Using Step Functions to Orchestrate Amazon EMR workloads

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-using-step-functions-to-orchestrate-amazon-emr-workloads/

AWS Step Functions allows you to add serverless workflow automation to your applications. The steps of your workflow can run anywhere, including in AWS Lambda functions, on Amazon Elastic Compute Cloud (EC2), or on-premises. To simplify building workflows, Step Functions is directly integrated with multiple AWS Services: Amazon ECS, AWS Fargate, Amazon DynamoDB, Amazon Simple Notification Service (SNS), Amazon Simple Queue Service (SQS), AWS Batch, AWS Glue, Amazon SageMaker, and (to run nested workflows) with Step Functions itself.

Starting today, Step Functions connects to Amazon EMR, enabling you to create data processing and analysis workflows with minimal code, saving time, and optimizing cluster utilization. For example, building data processing pipelines for machine learning is time consuming and hard. With this new integration, you have a simple way to orchestrate workflow capabilities, including parallel executions and dependencies from the result of a previous step, and handle failures and exceptions when running data processing jobs.

Specifically, a Step Functions state machine can now:

  • Create or terminate an EMR cluster, including the possibility to change the cluster termination protection. In this way, you can reuse an existing EMR cluster for your workflow, or create one on-demand during execution of a workflow.
  • Add or cancel an EMR step for your cluster. Each EMR step is a unit of work that contains instructions to manipulate data for processing by software installed on the cluster, including tools such as Apache Spark, Hive, or Presto.
  • Modify the size of an EMR cluster instance fleet or group, allowing you to manage scaling programmatically depending on the requirements of each step of your workflow. For example, you may increase the size of an instance group before adding a compute-intensive step, and reduce the size just after it has completed.

When you create or terminate a cluster or add an EMR step to a cluster, you can use synchronous integrations to move to the next step of your workflow only when the corresponding activity has completed on the EMR cluster.

Reading the configuration or the state of your EMR clusters is not part of the Step Functions service integration. In case you need that, the EMR List* and Describe* APIs can be accessed using Lambda functions as tasks.

Building a Workflow with EMR and Step Functions
On the Step Functions console, I create a new state machine. The console renders it visually, so that is much easier to understand:

To create the state machine, I use the following definition using the Amazon States Language (ASL):

{
  "StartAt": "Should_Create_Cluster",
  "States": {
    "Should_Create_Cluster": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.CreateCluster",
          "BooleanEquals": true,
          "Next": "Create_A_Cluster"
        },
        {
          "Variable": "$.CreateCluster",
          "BooleanEquals": false,
          "Next": "Enable_Termination_Protection"
        }
      ],
      "Default": "Create_A_Cluster"
    },
    "Create_A_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
      "Parameters": {
        "Name": "WorkflowCluster",
        "VisibleToAllUsers": true,
        "ReleaseLabel": "emr-5.28.0",
        "Applications": [{ "Name": "Hive" }],
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "LogUri": "s3://aws-logs-123412341234-eu-west-1/elasticmapreduce/",
        "Instances": {
          "KeepJobFlowAliveWhenNoSteps": true,
          "InstanceFleets": [
            {
              "InstanceFleetType": "MASTER",
              "TargetOnDemandCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge"
                }
              ]
            },
            {
              "InstanceFleetType": "CORE",
              "TargetOnDemandCapacity": 1,
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m4.xlarge"
                }
              ]
            }
          ]
        }
      },
      "ResultPath": "$.CreateClusterResult",
      "Next": "Merge_Results"
    },
    "Merge_Results": {
      "Type": "Pass",
      "Parameters": {
        "CreateCluster.$": "$.CreateCluster",
        "TerminateCluster.$": "$.TerminateCluster",
        "ClusterId.$": "$.CreateClusterResult.ClusterId"
      },
      "Next": "Enable_Termination_Protection"
    },
    "Enable_Termination_Protection": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
      "Parameters": {
        "ClusterId.$": "$.ClusterId",
        "TerminationProtected": true
      },
      "ResultPath": null,
      "Next": "Add_Steps_Parallel"
    },
    "Add_Steps_Parallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Step_One",
          "States": {
            "Step_One": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The first step",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "hive-script",
                      "--run-hive-script",
                      "--args",
                      "-f",
                      "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
                      "-d",
                      "INPUT=s3://eu-west-1.elasticmapreduce.samples",
                      "-d",
                      "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
                    ]
                  }
                }
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "Wait_10_Seconds",
          "States": {
            "Wait_10_Seconds": {
              "Type": "Wait",
              "Seconds": 10,
              "Next": "Step_Two (async)"
            },
            "Step_Two (async)": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:addStep",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "Step": {
                  "Name": "The second step",
                  "ActionOnFailure": "CONTINUE",
                  "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                      "hive-script",
                      "--run-hive-script",
                      "--args",
                      "-f",
                      "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
                      "-d",
                      "INPUT=s3://eu-west-1.elasticmapreduce.samples",
                      "-d",
                      "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
                    ]
                  }
                }
              },
              "ResultPath": "$.AddStepsResult",
              "Next": "Wait_Another_10_Seconds"
            },
            "Wait_Another_10_Seconds": {
              "Type": "Wait",
              "Seconds": 10,
              "Next": "Cancel_Step_Two"
            },
            "Cancel_Step_Two": {
              "Type": "Task",
              "Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
              "Parameters": {
                "ClusterId.$": "$.ClusterId",
                "StepId.$": "$.AddStepsResult.StepId"
              },
              "End": true
            }
          }
        }
      ],
      "ResultPath": null,
      "Next": "Step_Three"
    },
    "Step_Three": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
      "Parameters": {
        "ClusterId.$": "$.ClusterId",
        "Step": {
          "Name": "The third step",
          "ActionOnFailure": "CONTINUE",
          "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
              "hive-script",
              "--run-hive-script",
              "--args",
              "-f",
              "s3://eu-west-1.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
              "-d",
              "INPUT=s3://eu-west-1.elasticmapreduce.samples",
              "-d",
              "OUTPUT=s3://MY-BUCKET/MyHiveQueryResults/"
            ]
          }
        }
      },
      "ResultPath": null,
      "Next": "Disable_Termination_Protection"
    },
    "Disable_Termination_Protection": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
      "Parameters": {
        "ClusterId.$": "$.ClusterId",
        "TerminationProtected": false
      },
      "ResultPath": null,
      "Next": "Should_Terminate_Cluster"
    },
    "Should_Terminate_Cluster": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.TerminateCluster",
          "BooleanEquals": true,
          "Next": "Terminate_Cluster"
        },
        {
          "Variable": "$.TerminateCluster",
          "BooleanEquals": false,
          "Next": "Wrapping_Up"
        }
      ],
      "Default": "Wrapping_Up"
    },
    "Terminate_Cluster": {
      "Type": "Task",
      "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
      "Parameters": {
        "ClusterId.$": "$.ClusterId"
      },
      "Next": "Wrapping_Up"
    },
    "Wrapping_Up": {
      "Type": "Pass",
      "End": true
    }
  }
}

I let the Step Functions console create a new AWS Identity and Access Management (IAM) role for the executions of this state machine. The role automatically includes all permissions required to access EMR.

This state machine can either use an existing EMR cluster, or create a new one. I can use the following input to create a new cluster that is terminated at the end of the workflow:

{
"CreateCluster": true,
"TerminateCluster": true
}

To use an existing cluster, I need to provide input in the cluster ID, using this syntax:

{
"CreateCluster": false,
"TerminateCluster": false,
"ClusterId": "j-..."
}

Let’s see how that works. As the workflow starts, the Should_Create_Cluster Choice state looks into the input to decide if it should enter the Create_A_Cluster state or not. There, I use a synchronous call (elasticmapreduce:createCluster.sync) to wait for the new EMR cluster to reach the WAITING state before progressing to the next workflow state. The AWS Step Functions console shows the resource that is being created with a link to the EMR console:

After that, the Merge_Results Pass state merges the input state with the cluster ID of the newly created cluster to pass it to the next step in the workflow.

Before starting to process any data, I use the Enable_Termination_Protection state (elasticmapreduce:setClusterTerminationProtection) to help ensure that the EC2 instances in my EMR cluster are not shut down by an accident or error.

Now I am ready to do something with the EMR cluster. I have three EMR steps in the workflow. For the sake of simplicity, these steps are all based on this Hive tutorial. For each step, I use Hive’s SQL-like interface to run a query on some sample CloudFront logs and write the results to Amazon Simple Storage Service (S3). In a production use case, you’d probably have a combination of EMR tools processing and analyzing your data in parallel (two or more steps running at the same time) or with some dependencies (the output of one step is required by another step). Let’s try to do something similar.

First I execute Step_One and Step_Two inside a Parallel state:

  • Step_One is running the EMR step synchronously as a job (elasticmapreduce:addStep.sync). That means that the execution waits for the EMR step to be completed (or cancelled) before moving on to the next step in the workflow. You can optionally add a timeout to monitor that the execution of the EMR step happens within an expected time frame.
  • Step_Two is adding an EMR step asynchronously (elasticmapreduce:addStep). In this case, the workflow moves to the next step as soon as EMR replies that the request has been received. After a few seconds, to try another integration, I cancel Step_Two (elasticmapreduce:cancelStep). This integration can be really useful in production use cases. For example, you can cancel an EMR step if you get an error from another step running in parallel that would make it useless to continue with the execution of this step.

After those two steps have both completed and produce their results, I execute Step_Three as a job, similarly to what I did for Step_One. When Step_Three has completed, I enter the Disable_Termination_Protection step, because I am done using the cluster for this workflow.

Depending on the input state, the Should_Terminate_Cluster Choice state is going to enter the Terminate_Cluster state (elasticmapreduce:terminateCluster.sync) and wait for the EMR cluster to terminate, or go straight to the Wrapping_Up state and leave the cluster running.

Finally I have a state for Wrapping_Up. I am not doing much in this final state actually, but you can’t end a workflow from a Choice state.

In the EMR console I see the status of my cluster and of the EMR steps:

Using the AWS Command Line Interface (CLI), I find the results of my query in the S3 bucket configured as output for the EMR steps:

aws s3 ls s3://MY-BUCKET/MyHiveQueryResults/
...

Based on my input, the EMR cluster is still running at the end of this workflow execution. I follow the resource link in the Create_A_Cluster step to go to the EMR console and terminate it. In case you are following along with this demo, be careful to not leave your EMR cluster running if you don’t need it.

Available Now
Step Functions integration with EMR is available in all regions. There is no additional cost for using this feature on top of the usual Step Functions and EMR pricing.

You can now use Step Functions to quickly build complex workflows for executing EMR jobs. A workflow can include parallel executions, dependencies, and exception handling. Step Functions makes it easy to retry failed jobs and terminate workflows after critical errors, because you can specify what happens when something goes wrong. Let me know what are you going to use this feature for!

Danilo

Amazon EMR introduces EMR runtime for Apache Spark

Post Syndicated from Joseph Marques original https://aws.amazon.com/blogs/big-data/amazon-emr-introduces-emr-runtime-for-apache-spark/

Amazon EMR is happy to announce Amazon EMR runtime for Apache Spark, a performance-optimized runtime environment for Apache Spark that is active by default on Amazon EMR clusters. EMR runtime for Spark is up to 32 times faster than EMR 5.16, with 100% API compatibility with open-source Spark. This means that your workloads run faster, saving you compute costs without making any changes to your applications.

Amazon EMR has been adding Spark runtime improvements since EMR 5.24, and discussed them in Optimizing Spark Performance. EMR 5.28 features several new improvements.

To measure these improvements, we compared EMR 5.16 (with open source Apache Spark version 2.4) with EMR 5.28 (with EMR runtime for Apache Spark compatible with Apache Spark version 2.4). We used TPC-DS benchmark queries with 3 TB scale and ran them on a six-node c4.8xlarge EMR cluster with data in Amazon S3. We measured performance improvements as the geometric mean of improvement in total query execution time, and the total query execution time across all queries. The results showed considerable improvement—that the geometric mean was 2.4 times faster and the total query runtime was 3.2 times faster.

The following graph shows performance improvements measured as total runtime for 104 TPC-DS queries. EMR 5.28 has the better (lower) runtime.

The following graph shows performance improvements measured as the geometric mean for 104 TPC-DS queries. EMR 5.28 has the better (lower) geomean.

In breaking down the per-query improvements, you can observe the highest performance gains in long-running queries.

The following graph shows performance improvements in EMR 5.28 compared to EMR 5.16 for long-running queries (running for more than 130 seconds in EMR 5.16). In this comparison, the higher numbers are better.

The following graph shows performance improvements in EMR 5.28 compared to EMR 5.16 for short-running queries (running for less than 130 seconds). Again, the higher numbers are better.

Queries running for more than 130 seconds are up to 32 times faster as seen in query 72. Queries running for less than 130 seconds are up to 6 times faster, with an average improvement of 2 times faster across the board.

Customers use Spark for a wide array of analytics use cases ranging from large-scale transformations to streaming, data science, and machine learning. They choose to run Spark on EMR because EMR provides the latest, stable, open-source community innovations, performant storage with Amazon S3, and the unique cost savings capabilities of Spot Instances and Auto Scaling. It also provides ease of use with managed EMR Notebooks, notebook-scoped libraries, Git integration, and easy debugging and monitoring with off-cluster Spark History Services. Combined with the runtime improvements, and fine-grained access control using AWS Lake Formation, Amazon EMR presents an excellent choice for customers running Apache Spark.

With each of these performance optimizations to Apache Spark, you benefit from better query performance. Stay tuned for additional updates that improve Apache Spark performance in Amazon EMR. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more Apache Spark optimizations, configuration best practices, and tuning advice.

 


About the Authors

Joseph Marques is a principal engineer for EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

 

 

 

 

New – Insert, Update, Delete Data on S3 with Amazon EMR and Apache Hudi

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/

Storing your data in Amazon S3 provides lots of benefits in terms of scale, reliability, and cost effectiveness. On top of that, you can leverage Amazon EMR to process and analyze your data using open source tools like Apache Spark, Hive, and Presto. As powerful as these tools are, it can still be challenging to deal with use cases where you need to do incremental data processing, and record-level insert, update, and delete.

Talking with customers, we found that there are use cases that need to handle incremental changes to individual records, for example:

  • Complying with data privacy regulations, where their users choose to exercise their right to be forgotten, or change their consent as to how their data can be used.
  • Working with streaming data, when you have to handle specific data insertion and update events.
  • Using change data capture (CDC) architectures to track and ingest database change logs from enterprise data warehouses or operational data stores.
  • Reinstating late arriving data, or analyzing data as of a specific point in time.

Starting today, EMR release 5.28.0 includes Apache Hudi (incubating), so that you no longer need to build custom solutions to perform record-level insert, update, and delete operations. Hudi development started in Uber in 2016 to address inefficiencies across ingest and ETL pipelines. In the recent months the EMR team has worked closely with the Apache Hudi community, contributing patches that include updating Hudi to Spark 2.4.4 (HUDI-12), supporting Spark Avro (HUDI-91), adding support for AWS Glue Data Catalog (HUDI-306), as well as multiple bug fixes.

Using Hudi, you can perform record-level inserts, updates, and deletes on S3 allowing you to comply with data privacy laws, consume real time streams and change data captures, reinstate late arriving data and track history and rollbacks in an open, vendor neutral format. You create datasets and tables and Hudi manages the underlying data format. Hudi uses Apache Parquet, and Apache Avro for data storage, and includes built-in integrations with Spark, Hive, and Presto, enabling you to query Hudi datasets using the same tools that you use today with near real-time access to fresh data.

When launching an EMR cluster, the libraries and tools for Hudi are installed and configured automatically any time at least one of the following components is selected: Hive, Spark, or Presto. You can use Spark to create new Hudi datasets, and insert, update, and delete data. Each Hudi dataset is registered in your cluster’s configured metastore (including the AWS Glue Data Catalog), and appears as a table that can be queried using Spark, Hive, and Presto.

Hudi supports two storage types that define how data is written, indexed, and read from S3:

  • Copy on Write – data is stored in columnar format (Parquet) and updates create a new version of the files during writes. This storage type is best used for read-heavy workloads, because the latest version of the dataset is always available in efficient columnar files.
  • Merge on Read – data is stored with a combination of columnar (Parquet) and row-based (Avro) formats; updates are logged to row-based “delta files” and compacted later creating a new version of the columnar files. This storage type is best used for write-heavy workloads, because new commits are written quickly as delta files, but reading the data set requires merging the compacted columnar files with the delta files.

Let’s do a quick overview of how you can set up and use Hudi datasets in an EMR cluster.

Using Apache Hudi with Amazon EMR
I start creating a cluster from the EMR console. In the advanced options I select EMR release 5.28.0 (the first including Hudi) and the following applications: Spark, Hive, and Tez. In the hardware options, I add 3 task nodes to ensure I have enough capacity to run both Spark and Hive.

When the cluster is ready, I use the key pair I selected in the security options to SSH into the master node and access the Spark Shell. I use the following command to start the Spark Shell to use it with Hudi:

$ spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
              --conf "spark.sql.hive.convertMetastoreParquet=false"
              --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

There, I use the following Scala code to import some sample ELB logs in a Hudi dataset using the Copy on Write storage type:

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://MY-BUCKET/PATH/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "request_ip",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "request_verb", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "request_timestamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "request_verb", 
    DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write
       .format("org.apache.hudi")
       .options(hudiOptions)
       .mode(SaveMode.Overwrite)
       .save(hudiTablePath)

In the Spark Shell, I can now count the records in the Hudi dataset:

scala> inputDF2.count()
res1: Long = 10491958

In the options, I used the integration with the Hive metastore configured for the cluster, so that the table is created in the default database. In this way, I can use Hive to query the data in the Hudi dataset:

hive> use default;
hive> select count(*) from elb_logs_hudi_cow;
...
OK
10491958
...

I can now update or delete a single record in the dataset. In the Spark Shell, I prepare some variables to find the record I want to update, and a SQL statement to select the value of the column I want to change:

val requestIpToUpdate = "243.80.62.181"
val sqlStatement = s"SELECT elb_name FROM elb_logs_hudi_cow WHERE request_ip = '$requestIpToUpdate'"

I execute the SQL statement to see the current value of the column:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_003|
+------------+

Then, I select and update the record:

// Create a DataFrame with a single record and update column value
val updateDF = inputDF.filter(col("request_ip") === requestIpToUpdate)
                      .withColumn("elb_name", lit("elb_demo_001"))

Now I update the Hudi dataset with a syntax similar to the one I used to create it. But this time, the DataFrame I am writing contains only one record:

// Write the DataFrame as an update to existing Hudi dataset
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .mode(SaveMode.Append)
        .save(hudiTablePath)

In the Spark Shell, I check the result of the update:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

Now I want to delete the same record. To delete it, I pass the EmptyHoodieRecordPayload payload in the write options:

// Write the DataFrame with an EmptyHoodieRecordPayload for deleting a record
updateDF.write
        .format("org.apache.hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
        .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,
                "org.apache.hudi.EmptyHoodieRecordPayload")
        .mode(SaveMode.Append)
        .save(hudiTablePath)

In the Spark Shell, I see that the record is no longer available:

scala> spark.sql(sqlStatement).show()
+--------+                                                                      
|elb_name|
+--------+
+--------+

How are all those updates and deletes managed by Hudi? Let’s use the Hudi Command Line Interface (CLI) to connect to the dataset and see now those changes are interpreted as commits:

This dataset is a Copy on Write dataset, that means that each time there is an update to a record, the file that contains that record will be rewritten to contain the updated values. You can see how many records have been written for each commit. The bottom line of the table describes the initial creation of the dataset, above there is the single record update, and at the top the single record delete.

With Hudi, you can roll back to each commit. For example, I can roll back the delete operation with:

hudi:elb_logs_hudi_cow->commit rollback --commit 20191104121031

In the Spark Shell, the record is now back to where it was, just after the update:

scala> spark.sql(sqlStatement).show()
+------------+                                                                  
|    elb_name|
+------------+
|elb_demo_001|
+------------+

Copy on Write is the default storage type. I can repeat the steps above to create and update a Merge on Read dataset type by adding this to our hudiOptions:

DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ"

If you update a Merge on Read dataset and look at the commits with the Hudi CLI, you can see how different Merge on Read is compared to Copy on Write. With Merge on Read, you are only writing the updated rows and not whole files as with Copy on Write. This is why Merge on Read is helpful for use cases that require more writes, or update/delete heavy workload, with a fewer number of reads. Delta commits are written to disk as Avro records (row-based storage), and compacted data is written as Parquet files (columnar storage). To avoid creating too many delta files, Hudi will automatically compact your dataset so that your reads are as performant as possible.

When a Merge On Read dataset is created, two Hive tables are created:

  • The first table matches the name of the dataset.
  • The second table has the characters _rt appended to its name; the _rt postfix stands for real-time.

When queried, the first table return the data that has been compacted, and will not show the latest delta commits. Using this table provides the best performance, but omits the freshest data. Querying the real-time table will merge the compacted data with the delta commits on read, hence this dataset being called “Merge on Read”. This will result in the freshest data being available, but incurs a performance overhead, and is not as performant as querying the compacted data. In this way, data engineers and analysts have the flexibility to choose between performance and data freshness.

Available Now
This new feature is available now in all regions with EMR 5.28.0. There is no additional cost in using Hudi with EMR. You can learn more about Hudi in the EMR documentation. This new tool can simplify the way you process, update and delete data in S3. Let me know which use cases are you going to use it for!

Danilo

Secure your data on Amazon EMR using native EBS and per bucket S3 encryption options

Post Syndicated from Duncan Chan original https://aws.amazon.com/blogs/big-data/secure-your-data-on-amazon-emr-using-native-ebs-and-per-bucket-s3-encryption-options/

Data encryption is an effective solution to bolster data security. You can make sure that only authorized users or applications read your sensitive data by encrypting your data and managing access to the encryption key. One of the main reasons that customers from regulated industries such as healthcare and finance choose Amazon EMR is because it provides them with a compliant environment to store and access data securely.

This post provides a detailed walkthrough of two new encryption options to help you secure your EMR cluster that handles sensitive data. The first option is native EBS encryption to encrypt volumes attached to EMR clusters. The second option is an Amazon S3 encryption that allows you to use different encryption modes and customer master keys (CMKs) for individual S3 buckets with Amazon EMR.

Local disk encryption on Amazon EMR

Previously you could only choose Linux Unified Key Setup (LUKS) for at-rest encryption. You now have a choice of using LUKS or native EBS encryption to encrypt EBS volumes attached to an EMR cluster. EBS encryption provides the following benefits:

  • End-to-end encryption – When you enable EBS encryption for Amazon EMR, all data on EBS volumes, including intermediate disk spills from applications and Disk I/O between the nodes and EBS volumes, are encrypted. The snapshots that you take of an encrypted EBS volume are also encrypted and you can move them between AWS Regions as needed.
  • Amazon EMR root volumes encryption – There is no need to create a custom Amazon Linux Image for encrypting root volumes.
  • Easy auditing for encryption When you use LUKS encryption, though your EBS volumes are encrypted along with any instance store volumes, you still see EBS with Not Encrypted status when you use an Amazon EC2 API or the EC2 console to check on the encryption status. This is because the API doesn’t look into the EMR cluster to check the disk status; your auditors would need to SSH into the cluster to check for disk encrypted compliance. However, with EBS encryption, you can check the encryptions status from the EC2 console or through an EC2 API call.
  • Transparent Encryption – EBS encryption is transparent to any applications running on Amazon EMR and doesn’t require you to modify any code.

Amazon EBS encryption integrates with AWS KMS to provide the encryption keys that protect your data. To use this feature, you have to use a CMK in your account and Region. A CMK gives you control to create and manage the key, including enabling and disabling the key, controlling access, rotating the key, and deleting it. For more information, see Customer Master Keys.

Enabling EBS encryption on Amazon EMR

To enable EBS encryption on Amazon EMR, complete the following steps:

  1. Create your CMK in AWS KMS.
    You can do this either through the AWS KMS console, AWS CLI, or the AWS KMS CreateKey API. Create keys in the same Region as your EMR cluster. For more information, see Creating Keys.
  2. Give the Amazon EMR service role and EC2 instance profile permission to use your CMK on your behalf.
    If you are using the EMR_DefaultRole, add the policy with the following steps:

    • Open the AWS KMS console.
    • Choose your AWS Region.
    • Choose the key ID or alias of the CMK you created.
    • On the key details page, under Key Users, choose Add.
    • Choose the Amazon EMR service role.The name of the default role is EMR_DefaultRole.
    • Choose Attach.
    • Choose the Amazon EC2 instance profile.The name of the default role for the instance profile is EMR_EC2_DefaultRole.
    • Choose Attach.
      If you are using a customized policy, add the following code to the service role to allow Amazon EMR to create and use the CMK, with the resource being the CMK ARN:

      { 
      "Version": "2012-10-17", 
      "Statement": [ 
         { 
         "Sid": "EmrDiskEncryptionPolicy", 
         "Effect": "Allow", 
         "Action": [ 
            "kms:Encrypt", 
            "kms:Decrypt", 
            "kms:ReEncrypt*", 
            "kms:CreateGrant", 
            "kms:GenerateDataKeyWithoutPlaintext", 
            "kms:DescribeKey" 
            ], 
         "Resource": [ 
            " arn:aws:kms:region:account-id:key/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx " " 
            ] 
         } 
      ] 
      } 

       

  3. Create and configure the Amazon EMR Security configuration template.Do this either through the console or using CLI or SDK, with the following steps:
    • Open the Amazon EMR console.
    • Choose Security Configuration.
    • Under Local disk encryption, choose Enable at-rest encryption for local disks
    • For Key provider type, choose AWS KMS.
    • For AWS KMS customer master key, choose the key ARN of your CMK.This post uses the key ARN ebsEncryption_emr_default_role.
    • Select Encrypt EBS volumes with EBS encryption.

Default encryption with EC2 vs. Amazon EMR EBS encryption

EC2 has a similar feature called default encryption. With this feature, all EBS volumes in your account are encrypted without exception using a single CMK that you specify per Region. With EBS encryption from Amazon EMR, you can use different a KMS key per EMR cluster to secure your EBS volumes. You can use both EBS encryption provided by Amazon EMR and default encryption provided by EC2.

For this post, EBS encryption provided by Amazon EMR takes precedent, and you encrypt the EBS volumes attached to the cluster with the CMK that you selected in the security configuration.

S3 encryption

Amazon S3 encryption also works with Amazon EMR File System (EMRFS) objects read from and written to S3. You can use either server-side encryption (SSE) or client-side encryption (CSE) mode to encrypt objects in S3 buckets. The following table summarizes the different encryption modes available for S3 encryption in Amazon EMR.

Encryption locationKey storageKey management
SSE-S3Server side on S3S3S3
SSE-KMSServer side on S3KMS

Choose the AWS managed CMK for Amazon S3 with the alias aws/s3, or create a custom CMK.

 

CSE-KMSClient side on the EMR clusterKMSA custom CMK that you create.
CSE-CustomClient side on the EMR clusterYouYour own key provider.

The encryption choice you make depends on your specific workload requirements. Though SSE-S3 is the most straightforward option that allows you to fully delegate the encryption of S3 objects to Amazon S3 by selecting a check box, SSE-KMS or CSE-KMS are better options that give you granular control over CMKs in KMS by using policies. With AWS KMS, you can see when, where, and by whom your customer managed keys (CMK) were used, because AWS CloudTrail logs API calls for key access and key management. These logs provide you with full audit capabilities for your keys. For more information, see Encryption at Rest for EMRFS Data in Amazon S3.

Encrypting your S3 buckets with different encryption modes and keys

With S3 encryption on Amazon EMR, all the encryption modes use a single CMK by default to encrypt objects in S3. If you have highly sensitive content in specific S3 buckets, you may want to manage the encryption of these buckets separately by using different CMKs or encryption modes for individual buckets. You can accomplish this using the per bucket encryption overrides option in Amazon EMR. To do so, complete the following steps:

  1. Open the Amazon EMR console.
  2. Choose Security Configuration.
  3. Under S3 encryption, select Enable at-rest encryption for EMRFS data in Amazon S3.
  4. For Default encryption mode, choose your encryption mode.This post uses SSE-KMS.
  5. For AWS KMS customer master key, choose your key.The key you provide here encrypts all S3 buckets used with Amazon EMR. This post uses ebsEncryption_emr_default_role.
  6. Choose Per bucket encryption overrides.You can set different encryption modes for different buckets.
  7. For S3 bucket, add your S3 bucket that you want to encrypt differently.
  8. For Encryption mode, choose an encryption mode.
  9. For Encryption materials, enter your CMK.

If you have already enabled default encryption for S3 buckets directly in Amazon S3, you can also choose to bypass the S3 encryption options in the security configuration setting in Amazon EMR. This allows Amazon EMR to delegate encrypting objects in the buckets to Amazon S3, which uses the encryption key specified in the bucket policy to encrypt objects before persisting it on S3.

Summary

This post walked through the native EBS and S3 encryption options available with Amazon EMR to encrypt and secure your data. Please share your feedback on how these optimizations benefit your real-world workloads.

 


About the Author

Duncan Chan is a software development engineer for Amazon EMR. He enjoys learning and working on big data technologies. When he is not working, he will be playing with his dogs.

 

 

 

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 2

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-2/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

In Part 1 of this post series, you learned how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows.

This post guides you through deploying the AWS CloudFormation templates, configuring Genie, and running an example workflow authored in Apache Airflow.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Solution overview

This solution uses an AWS CloudFormation template to create the necessary resources.

Users access the Apache Airflow Web UI and the Genie Web UI via SSH tunnel to the bastion host.

The Apache Airflow deployment uses Amazon ElastiCache for Redis as a Celery backend, Amazon EFS as a mount point to store DAGs, and Amazon RDS PostgreSQL for database services.

Genie uses Apache Zookeeper for leader election, an Amazon S3 bucket to store configurations (binaries, application dependencies, cluster metadata), and Amazon RDS PostgreSQL for database services. Genie submits jobs to an Amazon EMR cluster.

The architecture in this post is for demo purposes. In a production environment, the Apache Airflow and the Genie instances should be part of an Auto Scaling Group. For more information, see Deployment on the Genie Reference Guide.

The following diagram illustrates the solution architecture.

Creating and storing admin passwords in AWS Systems Manager Parameter Store

This solution uses AWS Systems Manager Parameter Store to store the passwords used in the configuration scripts. With AWS Systems Manager Parameter Store, you can create secure string parameters, which are parameters that have a plaintext parameter name and an encrypted parameter value. Parameter Store uses AWS KMS to encrypt and decrypt the parameter values of secure string parameters.

Before deploying the AWS CloudFormation templates, execute the following AWS CLI commands. These commands create AWS Systems Manager Parameter Store parameters to store the passwords for the RDS master user, the Airflow DB administrator, and the Genie DB administrator.

aws ssm put-parameter --name "/GenieStack/RDS/Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/AirflowSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/GenieSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

Creating an Amazon S3 Bucket for the solution and uploading the solution artifacts to S3

This solution uses Amazon S3 to store all artifacts used in the solution. Before deploying the AWS CloudFormation templates, create an Amazon S3 bucket and download the artifacts required by the solution from this link.

Unzip the artifacts required by the solution and upload the airflow and genie directories to the Amazon S3 bucket you just created. Keep a record of the Amazon S3 root path because you add it as a parameter to the AWS CloudFormation template later.

As an example, the following screenshot uses the root location geniestackbucket.

Use the value of the Amazon S3 Bucket you created for the AWS CloudFormation parameters GenieS3BucketLocation and AirflowBucketLocation.

Deploying the AWS CloudFormation stack

To launch the entire solution, choose Launch Stack.

The following table explains the parameters that the template requires. You can accept the default values for any parameters not in the table. For the full list of parameters, see the AWS CloudFormation template.

ParameterValue
Location of the configuration artifactsGenieS3BucketLocationThe S3 bucket with Genie artifacts and Genie’s installation scripts. For example: geniestackbucket.
AirflowBucketLocationThe S3 bucket with the Airflow artifacts. For example: geniestackbucket.
NetworkingSSHLocationThe IP address range to SSH to the Genie, Apache Zookeeper, and Apache Airflow EC2 instances.
SecurityBastionKeyNameAn existing EC2 key pair to enable SSH access to the bastion host instance.
AirflowKeyNameAn existing EC2 key pair to enable SSH access to the Apache Airflow instance.
ZKKeyNameAn existing EC2 key pair to enable SSH access to the Apache Zookeeper instance.
GenieKeyNameAn existing EC2 key pair to enable SSH access to the Genie.
EMRKeyNameAn existing Amazon EC2 key pair to enable SSH access to the Amazon EMR cluster.
LoggingemrLogUriThe S3 location to store Amazon EMR cluster Logs. For example: s3://replace-with-your-bucket-name/emrlogs/

Post-deployment steps

To access the Apache Airflow and Genie Web Interfaces, set up an SSH and configure a SOCKS proxy for your browser. Complete the following steps:

  1. On the AWS CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the public DNS of the bastion host instance.The following screenshot shows the instance this post uses.
  4. Set up an SSH tunnel to the master node using dynamic port forwarding.
    Instead of using the master public DNS name of your cluster and the username hadoop, which the walkthrough references, use the public DNS of the bastion host instance and replace the user hadoop for the user ec2-user.
  1. Configure the proxy settings to view websites hosted on the master node.
    You do not need to modify any of the steps in the walkthrough.

This process configures a SOCKS proxy management tool that allows you to automatically filter URLs based on text patterns and limit the proxy settings to domains that match the form of the Amazon EC2 instance’s public DNS name.

Accessing the Web UI for Apache Airflow and Genie

To access the Web UI for Apache Airflow and Genie, complete the following steps:

  1. On the CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the URLs for the Apache Airflow and Genie Web UI.The following screenshot shows the URLs that this post uses.
  1. Open two tabs in your web browser. You will use the tabs for the Apache Airflow UI and the Genie UI.
  2. For the Foxy Proxy you configured previously, click the icon Foxy Proxy added to the top right section of your browser and choose Use proxies based on their predefined patterns and priorities.The following screenshot shows the proxy options.
  1. Enter the URL for the Apache Airflow Web UI and for the Genie Web UI on their respective tabs.

You are now ready to run a workflow in this solution.

Preparing application resources

The first step as a platform admin engineer is to prepare the binaries and configurations of the big data applications that the platform supports. In this post, the Amazon EMR clusters use release 5.26.0. Because Amazon EMR release 5.26.0 has Hadoop 2.8.5 and Spark 2.4.3 installed, those are the applications you want to support in the big data platform. If you decide to use a different EMR release, prepare binaries and configurations for those versions. The following sections guide you through the steps to prepare binaries should you wish to use a different EMR release version.

To prepare a Genie application resource, create a YAML file with fields that are sent to Genie in a request to create an application resource.

This file defines metadata information about the application, such as the application name, type, version, tags, the location on S3 of the setup script, and the location of the application binaries. For more information, see Create an Application in the Genie REST API Guide.

Tag structure for application resources

This post uses the following tags for application resources:

  • type – The application type, such as Spark, Hadoop, Hive, Sqoop, or Presto.
  • version – The version of the application, such as 2.8.5 for Hadoop.

The next section shows how the tags are defined in the YAML file for an application resource. You can add an arbitrary number of tags to associate with Genie resources. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the Hadoop 2.8.5 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: hadoop-2.8.5
name: hadoop
user: hadoop
status: ACTIVE
description: Hadoop 2.8.5 Application
setupFile: s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/setup.sh
configs: []
version: 2.8.5
type: hadoop
tags:
  - type:hadoop
  - version:2.8.5
dependencies:
  - s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.tar.gz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The S3 objects referenced by the setupFile and dependencies labels are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download hadoop-2.8.5.tar.gz from https://www.apache.org/dist/hadoop/core/hadoop-2.8.5/.
  2. Upload hadoop-2.8.5.tar.gz to s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/.

Preparing the Spark 2.4.3 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3
name: spark
user: hadoop
status: ACTIVE
description: Spark 2.4.3 Application
setupFile: s3://Your_Bucket_Name/genie/applications/spark-2.4.3/setup.sh
configs: []
version: 2.4.3
type: spark
tags:
  - type:spark
  - version:2.4.3
  - version:2.4
dependencies:
  - s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The objects in setupFile and dependencies are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download spark-2.4.3-bin-hadoop2.7.tgz from https://archive.apache.org/dist/spark/spark-2.4.3/ .
  2. Upload spark-2.4.3-bin-hadoop2.7.tgz to s3://Your_Bucket_Name/genie/applications/spark-2.4.3/ .

Because spark-2.4.3-bin-hadoop2.7.tgz uses Hadoop 2.7 and not Hadoop 2.8.3, you need to extract the EMRFS libraries for Hadoop 2.7 from an EMR cluster running Hadoop 2.7 (release 5.11.3). This is already available in your S3 Bucket. For reference, the steps to extract the EMRFS libraries are as follows:

  1. Deploy an EMR cluster with release 5.11.3.
  2. Run the following command:
aws s3 cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.20.0.jar s3://Your_Bucket_Name/genie/applications/spark-2.4.3/hadoop-2.7/aws/emr/emrfs/lib/

Preparing a command resource

The next step as a platform admin engineer is to prepare the Genie commands that the platform supports.

In this post, the workflows use Apache Spark. This section shows the steps to prepare a command resource of type Apache Spark.

To prepare a Genie command resource, create a YAML file with fields that are sent to Genie in a request to create a command resource.

This file defines metadata information about the command, such as the command name, type, version, tags, the location on S3 of the setup script, and the parameters to use during command execution. For more information, see Create a Command in the Genie REST API Guide.

Tag structure for command resources

This post uses the following tag structure for command resources:

  • type – The command type, for example, spark-submit.
  • version – The version of the command, for example, 2.4.3 for Spark.

The next section shows how the tags are defined in the YAML file for a command resource. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the spark-submit command resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3_spark-submit
name: Spark Submit 
user: hadoop 
description: Spark Submit Command 
status: ACTIVE 
setupFile: s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/setup.sh
configs: [] 
executable: ${SPARK_HOME}/bin/spark-submit --master yarn --deploy-mode client 
version: 2.4.3 
tags:
  - type:spark-submit
  - version:2.4.3
checkDelay: 5000

The file is also available at s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/spark-2.4.3_spark-submit.yml.

The objects in setupFile are available in your S3 bucket.

Preparing cluster resources

This post also automated the step to prepare cluster resources; it follows a similar process as described previously but applied to cluster resources.

During the startup of the Amazon EMR cluster, a custom script creates a YAML file with the metadata details about the cluster and uploads the file to S3. For more information, see Create a Cluster in the Genie REST API Guide.

The script also extracts all Amazon EMR libraries and uploads them to S3. The next section discusses the process of registering clusters with Genie.

The script is available at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

Tag structure for cluster resources

This post uses the following tag structure for cluster resources:

  • cluster.release – The Amazon EMR release name. For example, emr-5.26.0.
  • cluster.id – The Amazon EMR cluster ID. For example, j-xxxxxxxx.
  • cluster.name – The Amazon EMR cluster name.
  • cluster.role – The role associated with this cluster. For this post, the role is batch. Other possible roles would be ad hoc or Presto, for example.

You can add new tags for a cluster resource or change the values of existing tags by editing s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

You could also use other combinations of tags, such as a tag to identify the application lifecycle environment or required custom jars.

Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files. If multiple clusters share the same tag, by default, Genie distributes jobs across clusters associated with the same tag randomly. For more information, see Cluster Load Balancing in the Genie Reference Guide.

Registering resources with Genie

Up to this point, all the configuration activities mentioned in the previous sections were already prepared for you.

The following sections show how to register resources with Genie. In this section you will be connecting to the bastion via SSH to run configuration commands.

Registering application resources

To register the application resources you prepared in the previous section, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_application_resources.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the resource information, navigate to the Genie Web UI and choose the Applications tab. See the following screenshot. The screenshot shows two application resources, one for Apache Spark (version 2.4.3) and the other for Apache Hadoop (version 2.8.5).

Registering commands and associate commands with applications

The next step is to register the Genie command resources with specific applications. For this post, because spark-submit depends on Apache Hadoop and Apache Spark, associate the spark-submit command with both applications.

The order you define for the applications in file genie_register_command_resources_and_associate_applications.py is important. Because Apache Spark depends on Apache Hadoop, the file first references Apache Hadoop and then Apache Spark. See the following code:

commands = [{'command_name' : 'spark-2.4.3_spark-submit', 'applications' : ['hadoop-2.8.5', 'spark-2.4.3']}]

To register the command resources and associate them with the application resources registered in the previous step, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_command_resources_and_associate_applications.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the command you registered plus the applications it is linked to, navigate to the Genie Web UI and choose the Commands tab.

The following screenshot shows the command details and the applications it is linked to.

Registering Amazon EMR clusters

As previously mentioned, the Amazon EMR cluster deployed in this solution registers the cluster when the cluster starts via an Amazon EMR step. You can access the script that Amazon EMR clusters use at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh. The script also automates deregistering the cluster from Genie when the cluster terminates.

In the Genie Web UI, choose the Clusters tab. This page shows you the current cluster resources. You can also find the location of the configuration files that uploaded to the cluster S3 location during the registration step.

The following screenshot shows the cluster details and the location of configuration files (yarn-site.xml, core-site.xml, mapred-site.xml).

Linking commands to clusters

You have now registered all applications, commands, and clusters, and associated commands with the applications on which they depend. The final step is to link a command to a specific Amazon EMR cluster that is configured to run it.

Complete the following steps:

  1. SSH into the bastion host.
  2. Open /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py with your preferred text editor.
  3. Look for the following lines in the code:# Change cluster_name below
    clusters = [{'cluster_name' : 'j-xxxxxxxx', 'commands' :
    ['spark-2.4.3_spark-submit']}]
  1. Replace j-xxxxxxxx in the file with the cluster_name.
    To see the name of the cluster, navigate to the Genie Web UI and choose Clusters.
  2. To link the command to a specific Amazon EMR cluster run the following command:
    python /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

The command is now linked to a cluster.

In the Genie Web UI, choose the Commands tab. This page shows you the current command resources. Select spark-2.4.3_spark_submit and see the clusters associated with the command.

The following screenshot shows the command details and the clusters it is linked to.

You have configured Genie with all resources; it can now receive job requests.

Running an Apache Airflow workflow

It is out of the scope of this post to provide a detailed description of the workflow code and dataset. This section provides details of how Apache Airflow submits jobs to Genie via a GenieOperator that this post provides.

The GenieOperator for Apache Airflow

The GenieOperator allows the data engineer to define the combination of tags to identify the commands and the clusters in which the tasks should run.

In the following code example, the cluster tag is ‘emr.cluster.role:batch‘ and the command tags are ‘type:spark-submit‘ and ‘version:2.4.3‘.

spark_transform_to_parquet_movies = GenieOperator(
    task_id='transform_to_parquet_movies',
    cluster_tags=['emr.cluster.role:batch'],
    command_tags=['type:spark-submit', 'version:2.4.3'],
    command_arguments="transform_to_parquet.py s3://{}/airflow/demo/input/csv/{}  s3://{}/demo/output/parquet/{}/".format(bucket,'movies.csv',bucket,'movies'), dependencies="s3://{}/airflow/dag_artifacts/transforms/transform_to_parquet.py".format(bucket),
    description='Demo Spark Submit Job',
    job_name="Genie Demo Spark Submit Job",
    tags=['transform_to_parquet_movies'],
    xcom_vars=dict(),
    retries=3,
    dag=extraction_dag)

The property command_arguments defines the arguments to the spark-submit command, and dependencies defines the location of the code for the Apache Spark Application (PySpark).

You can find the code for the GenieOperator in the following location: s3://Your_Bucket_Name/airflow/plugins/genie_plugin.py.

One of the arguments to the DAG is the Genie connection ID (genie_conn_id). This connection was created during the automated setup of the Apache Airflow Instance. To see this and other existing connections, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Connections.

The following screenshot shows the connection details.

The Airflow variable s3_location_genie_demo reference in the DAG was set during the installation process. To see all configured Apache Airflow variables, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Variables.

The following screenshot shows the Variables page.

Triggering the workflow

You can now trigger the execution of the movie_lens_transfomer_to_parquet DAG. Complete the following steps:

  1. In the Apache Airflow Web UI, choose the DAGs
  2. Next to your DAG, change Off to On.

The following screenshot shows the DAGs page.

For this example DAG, this post uses a small subset of the movielens dataset. This dataset is a popular open source dataset, which you can use in exploring data science algorithms. Each dataset file is a comma-separated values (CSV) file with a single header row. All files are available in your solution S3 bucket under s3://Your_Bucket_Name/airflow/demo/input/csv .

movie_lens_transfomer_to_parquet is a simple workflow that triggers a Spark job that converts input files from CSV to Parquet.

The following screenshot shows a graphical representation of the DAG.

In this example DAG, after transform_to_parquet_movies concludes, you can potentially execute four tasks in parallel. Because the DAG concurrency is set to 3, as seen in the following code example, only three tasks can run at the same time.

# Initialize the DAG
# Concurrency --> Number of tasks allowed to run concurrently
extraction_dag = DAG(dag_name,
          default_args=dag_default_args,
          start_date=start_date,
          schedule_interval=schedule_interval,
          concurrency=3,
          max_active_runs=1
          )

Visiting the Genie job UI

The GenieOperator for Apache Airflow submitted the jobs to Genie. To see job details, in the Genie Web UI, choose the Jobs tab. You can see details such as the jobs submitted, their arguments, the cluster it is running, and the job status.

The following screenshot shows the Jobs page.

You can now experiment with this architecture by provisioning a new Amazon EMR cluster, registering it with a new value (for example, “production”) for Genie tag “emr.cluster.role”, linking the cluster to a command resource, and updating the tag combination in the GenieOperator used by some of the tasks in the DAG.

Cleaning up

To avoid incurring future charges, delete the resources and the S3 bucket created for this post.

Conclusion

This post showed how to deploy an AWS CloudFormation template that sets up a demo environment for Genie, Apache Airflow, and Amazon EMR. It also demonstrated how to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Authors

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

 

 

 

 

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-1/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

This post introduces an architecture that helps centralized platform teams maintain a big data platform to service thousands of concurrent ETL workflows, and simplifies the operational tasks required to accomplish that.

Architecture components

At high level, the architecture uses two open source technologies with Amazon EMR to provide a big data platform for ETL workflow authoring, orchestration, and execution. Genie provides a centralized REST API for concurrent big data job submission, dynamic job routing, central configuration management, and abstraction of the Amazon EMR clusters. Apache Airflow provides a platform for job orchestration that allows you to programmatically author, schedule, and monitor complex data pipelines. Amazon EMR provides a managed cluster platform that can run and scale Apache Hadoop, Apache Spark, and other big data frameworks.

The following diagram illustrates the architecture.

Apache Airflow

Apache Airflow is an open source tool for authoring and orchestrating big data workflows.

With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that are executed independently. The DAG keeps track of the relationships and dependencies between tasks.

Operators define a template to define a single task in the workflow. Airflow provides operators for common tasks, and you can also define custom operators. This post discusses the custom operator (GenieOperator) to submit tasks to Genie.

A task is a parameterized instance of an operator. After an operator is instantiated, it’s referred to as a task. A task instance represents a specific run of a task. A task instance has an associated DAG, task, and point in time.

You can run DAGs and tasks on demand or schedule them to run at a specific time defined as a cron expression in the DAG.

For additional details on Apache Airflow, see Concepts in the Apache Airflow documentation.

Genie

Genie is an open source tool by Netflix that provides configuration-management capabilities and dynamic routing of jobs by abstracting access to the underlining Amazon EMR clusters.

Genie provides a REST API to submit jobs from big data applications such as Apache Hadoop MapReduce or Apache Spark. Genie manages the metadata of the underlining clusters and the commands and applications that run in the clusters.

Genie abstracts access to the processing clusters by associating one or more tags with the clusters. You can also associate tags with the metadata details for the applications and commands that the big data platform supports. As Genie receives job submissions for specific tags, it uses a combination of the cluster/command tag to route each job to the correct EMR cluster dynamically.

Genie’s data model

Genie provides a data model to capture the metadata associated with resources in your big data environment.

An application resource is a reusable set of binaries, configuration files, and setup files to install and configure applications supported by the big data platform on the Genie node that submits the jobs to the clusters. When Genie receives a job, the Genie node downloads all dependencies, configuration files, and setup files associated with the applications and stores it in a job working directory. Applications are linked to commands because they represent the binaries and configurations needed before a command runs.

Command resources represent the parameters when using the command line to submit work to a cluster and which applications need to be available on the PATH to run the command. Command resources glue metadata components together. For example, a command resource representing a Hive command would include a hive-site.xml and be associated with a set of application resources that provide the Hive and Hadoop binaries needed to run the command. Moreover, a command resource is linked to the clusters it can run on.

A cluster resource identifies the details of an execution cluster, including connection details, cluster status, tags, and additional properties. A cluster resource can register with Genie during startup and deregister during termination automatically. Clusters are linked to one or more commands that can run in it. After a command is linked to a cluster, Genie can start submitting jobs to the cluster.

Lastly, there are three job resource types: job request, job, and job execution. A job request resource represents the request submission with details to run a job. Based on the parameters submitted in the request, a job resource is created. The job resource captures details such as the command, cluster, and applications associated with the job. Additionally, information on status, start time, and end time is also available on the job resource. A job execution resource provides administrative details so you can understand where the job ran.

For more information, see Data Model on the Genie Reference Guide.

Amazon EMR and Amazon S3

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. For more information, see Overview of Amazon EMR Architecture and Overview of Amazon EMR.

Data is stored in Amazon S3, an object storage service with scalable performance, ease-of-use features, and native encryption and access control capabilities. For more details on S3, see Amazon S3 as the Data Lake Storage Platform.

Architecture deep dive

Two main actors interact with this architecture: platform admin engineers and data engineers.

Platform admin engineers have administrator access to all components. They can add or remove clusters, and configure the applications and the commands that the platform supports.

Data engineers focus on writing big data applications with their preferred frameworks (Apache Spark, Apache Hadoop MR, Apache Sqoop, Apache Hive, Apache Pig, and Presto) and authoring python scripts to represent DAGs.

At high level, the team of platform admin engineers prepares the supported big data applications and its dependencies and registers them with Genie. The team of platform admin engineers launches Amazon EMR clusters that register with Genie during startup.

The team of platform admin engineers associates each Genie metadata resource (applications, commands, and clusters) with Genie tags. For example, you can associate a cluster resource with a tag named environment and the value can be “Production Environment”, “Test Environment”, or “Development Environment”.

Data engineers author workflows as Airflow DAGs and use a custom Airflow Operator—GenieOperator—to submit tasks to Genie. They can use a combination of tags to identify the type of tasks they are running plus where the tasks should run. For example, you might need to run Apache Spark 2.4.3 tasks in the environment identified by the “Production Environment” tag. To do this, set the cluster and command tags in the Airflow GenieOperator as the following code:

(cluster_tags=['emr.cluster.environment:production'],command_tags=['type:spark-submit','ver:2.4.3'])

The following diagram illustrates this architecture.

The workflow, as it corresponds to the numbers in this diagram are as follows:

  1. A platform admin engineer prepares the binaries and dependencies of the supported applications (Spark-2.4.5, Spark-2.1.0, Hive-2.3.5, etc.). The platform admin engineer also prepares commands (spark-submit, hive). The platform admin engineer registers applications and commands with Genie. Moreover, the platform admin engineer associates commands with applications and links commands to a set of clusters after step 2 (below) is concluded.
  2. Amazon EMR cluster(s) register with Genie during startup.
  3. A data engineer authors Airflow DAGs and uses the Genie tag to reference the environment, application, command or any combination of the above. In the workflow code, the data engineer uses the GenieOperator. The GenieOperator submits jobs to Genie.
  4. A schedule triggers workflow execution or a data engineer manually triggers the workflow execution. The jobs that compose the workflow are submitted to Genie for execution with a set of Genie tags that specify where the job should be run.
  5. The Genie node, working as the client gateway, will set up a working directory with all binaries and dependencies. Genie dynamically routes the jobs to the cluster(s) associated with the provided Genie tags. The Amazon EMR clusters run the jobs.

For details on the authorization and authentication mechanisms supported by Apache Airflow and Genie see Security in the Apache Airflow documentation and Security in the Genie documentation.  This architecture pattern does not expose SSH access to the Amazon EMR clusters. For details on providing different levels of access to data in Amazon S3 through EMR File System (EMRFS), see Configure IAM Roles for EMRFS Requests to Amazon S3.

Use cases enabled by this architecture

The following use cases demonstrate the capabilities this architecture provides.

Managing upgrades and deployments with no downtime and adopting the latest open source release

In a large organization, teams that use the data platform use heterogeneous frameworks and different versions. You can use this architecture to support upgrades with no downtime and offer the latest version of open source frameworks in a short amount of time.

Genie and Amazon EMR are the key components to enable this use case. As the Amazon EMR service team strives to add the latest version of the open source frameworks running on Amazon EMR in a short release cycle, you can keep up with your internal teams’ needs of the latest features of their preferred open source framework.

When a new version of the open source framework is available, you need to test it, add the new supported version and its dependencies to Genie, and move tags in the old cluster to the new one. The new cluster takes new job submissions, and the old cluster concludes jobs it is still running.

Moreover, because Genie centralizes the location of application binaries and its dependencies, upgrading binaries and dependencies in Genie also upgrades any upstream client automatically. Using Genie removes the need for upgrading all upstream clients.

Managing a centralized configuration, job and cluster status, and logging

In a universe of thousands of jobs and multiple clusters, you need to identify where a specific job is running and access logging details quickly. This architecture gives you visibility into jobs running on the data platform, logging of jobs, clusters, and their configurations.

Having programmatic access to the big data platform

This architecture enables a single point of job submissions by using Genie’s REST API. Access to the underlying cluster is abstracted through a set of APIs that enable administration tasks plus submitting jobs to the clusters. A REST API call submits jobs into Genie asynchronously. If accepted, a job-id is returned that you can use to get job status and outputs programmatically via API or web UI. A Genie node sets up the working directory and runs the job on a separate process.

You can also integrate this architecture with continuous integration and continuous delivery (CI/CD) pipelines for big data application and Apache Airflow DAGs.

Enabling scalable client gateways and concurrent job submissions

The Genie node acts as a client gateway (edge node) and can scale horizontally to make sure the client gateway resources used to submit jobs to the data platform meet demand. Moreover, Genie allows the submission of concurrent jobs.

When to use this architecture

This architecture is recommended for organizations that use multiple large, multi-tenant processing clusters instead of transient clusters. It is out of the scope of this post to address when organizations should consider always-on clusters versus transient clusters (you can use an EMR Airflow Operator to spin up Amazon EMR clusters that register with Genie, run a job, and tear them down). You should use Reserved Instances with this architecture. For more information, see Using Reserved Instances.

This architecture is especially recommended for organizations that choose to have a central platform team to administer and maintain a big data platform that supports many internal teams that require thousands of jobs to run concurrently.

This architecture might not make sense for organizations that are not at as large or don’t expect to grow to that scale. The benefits of cluster abstraction and centralized configuration management are ideal in bringing structured access to a potentially chaotic environment of thousands of concurrent workflows and hundreds of teams.

This architecture is also recommended for organizations that support a high percentage of multi-hour or overlapping workflows and heterogeneous frameworks (Apache Spark, Apache Hive, Apache Pig, Apache Hadoop MapReduce, Apache Sqoop, or Presto).

If your organization relies solely on Apache Spark and is aligned with the recommendations discussed previously, this architecture might still apply. For organizations that don’t have the scale to justify the need for centralized REST API for job submission, cluster abstraction, dynamic job routing, or centralized configuration management, Apache Livy plus Amazon EMR might be the appropriate option. Genie has its own scalable infrastructure that acts as the edge client. This means that Genie does not compete with Amazon EMR master instance resources, whereas Apache Livy does.

If the majority of your organization’s workflows are a few short-lived jobs, opting for a serverless processing layer, serverless ad hoc querying layer, or using dedicated transient Amazon EMR clusters per workflow might be more appropriate. If the majority of your organization’s workflows are composed of thousands of short-lived jobs, the architecture still applies because it removes the need to spin up and down clusters.

This architecture is recommended for organizations that require full control of the processing platform to optimize component performance. Moreover, this architecture is recommended for organizations that need to enforce centralized governance on their workflows via CI/CD pipelines.

It is out of the scope of this post to evaluate different orchestration options or the benefits of adopting Airflow as the orchestration layer. When considering adopting an architecture, also consider the existing skillset and time to adopt tooling. The open source nature of Genie may allow you to integrate other orchestration tools. Evaluating that route might be an option if you wish to adopt this architecture with another orchestration tool.

Conclusion

This post presented how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows. The post described the architecture components, the use cases the architecture supports, and when to use it. The second part of this post deploys a demo environment and walks you through the steps to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Author

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

Install Python libraries on a running cluster with EMR Notebooks

Post Syndicated from Parag Chaudhari original https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/

Last year, AWS introduced EMR Notebooks, a managed notebook environment based on the open-source Jupyter notebook application.

This post discusses installing notebook-scoped libraries on a running cluster directly via an EMR Notebook. Before this feature, you had to rely on bootstrap actions or use custom AMI to install additional libraries that are not pre-packaged with the EMR AMI when you provision the cluster. This post also discusses how to use the pre-installed Python libraries available locally within EMR Notebooks to analyze and plot your results. This capability is useful in scenarios in which you don’t have access to a PyPI repository but need to analyze and visualize a dataset.

Benefits of using notebook-scoped libraries with EMR Notebooks

Notebook-scoped libraries provide you the following benefits:

  • Runtime installation – You can import your favorite Python libraries from PyPI repositories and install them on your remote cluster on the fly when you need them. These libraries are instantly available to your Spark runtime environment. There is no need to restart the notebook session or recreate your cluster.
  • Dependency isolation – The libraries you install using EMR Notebooks are isolated to your notebook session and don’t interfere with bootstrapped cluster libraries or libraries installed from other notebook sessions. These notebook-scoped libraries take precedence over bootstrapped libraries. Multiple notebook users can import their preferred version of the library and use it without dependency clashes on the same cluster.
  • Portable library environment – The library package installation happens from your notebook file. This allows you to recreate the library environment when you switch the notebook to a different cluster by re-executing the notebook code. At the end of the notebook session, the libraries you install through EMR Notebooks are automatically removed from the hosting EMR cluster.

Prerequisites

To use this feature in EMR Notebooks, you need a notebook attached to a cluster running EMR release 5.26.0 or later. The cluster should have access to the public or private PyPI repository from which you want to import the libraries. For more information, see Creating a Notebook.

There are different ways to configure your VPC networking to allow clusters inside the VPC to connect to an external repository. For more information, see Scenarios and Examples in the Amazon VPC User Guide.

Using notebook-scoped libraries

This post demonstrates the notebook-scoped libraries feature of EMR Notebooks by analyzing the publicly available Amazon customer reviews dataset for books. For more information, see Amazon Customer Reviews Dataset on the Registry of Open Data for AWS.

Open your notebook and make sure the kernel is set to PySpark. Run the following command from the notebook cell:

print("Welcome to my EMR Notebook!")

You get the following output:

Output shows newly created spark session.

You can examine the current notebook session configuration by running the following command:

%%info

You get the following output:

Output shows spark session properties which include Python version and properties to enable this new feature.

The notebook session is configured for Python 3 by default (through spark.pyspark.python). If you prefer to use Python 2, reconfigure your notebook session by running the following command from your notebook cell:

%%configure -f { "conf":{ "spark.pyspark.python": "python", 
                "spark.pyspark.virtualenv.enabled": "true", 
                "spark.pyspark.virtualenv.type":"native", 
                "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv" }}

You can also verify the Python version used in your current notebook session by running the following code:

import sys
sys.version

You get the following output:

Output shows Python version 3.6.8

Before starting your analysis, check the libraries that are already available on the cluster. You can do this using the list_packages() PySpark API, which lists all the Python libraries on the cluster. Run the following code:

sc.list_packages()

You get an output similar to the following code, which shows all the available Python 3-compatible packages on your cluster:

Output shows list of Python packages along with their versions.

Load the Amazon customer reviews data for books into a Spark DataFrame with the following code:

df = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')

You are now ready to explore the data. Determine the schema and number of available columns in your dataset with the following code:

# Total columns
print(f'Total Columns: {len(df.dtypes)}')
df.printSchema()

The following code is the output:

Output shows that Spark DataFrame has 15 columns. It also shows the schema of the Spark DataFrame.

This dataset has a total of 15 columns. You can also check the total rows in your dataset by running the following code:

# Total row
print(f'Total Rows: {df.count():,}')

You get the following output:

Output shows that Spark DataFrame has more than 20 million rows.

Check the total number of books with the following code:

# Total number of books
num_of_books = df.select('product_id').distinct().count()
print(f'Number of Books: {num_of_books:,}')

You get the following output:

Output shows that there are more than 3 million books.

You can also analyze the number of book reviews by year and find the distribution of customer ratings. To do this, import the Pandas library version 0.25.1 and the latest Matplotlib library from the public PyPI repository. Install them on the cluster attached to your notebook using the install_pypi_package API. See the following code:

sc.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") #Install matplotlib from given PyPI repository

You get the following output:

Output shows that “pandas” and “matplotlib” packages are successfully installed.

The install_pypi_package PySpark API installs your libraries along with any associated dependencies. By default, it installs the latest version of the library that is compatible with the Python version you are using. You can also install a specific version of the library by specifying the library version from the previous Pandas example.

Verify that your imported packages successfully installed by running the following code:

sc.list_packages()

You get the following output:

Output shows list of Python packages along with their versions.

You can also analyze the trend for the number of reviews provided across multiple years. Use ‘toPandas()’ to convert the Spark data frame to a Pandas data frame, which you can visualize with Matplotlib. See the following code:

# Number of reviews across years
num_of_reviews_by_year = df.groupBy('year').count().orderBy('year').toPandas()

import matplotlib.pyplot as plt
plt.clf()
num_of_reviews_by_year.plot(kind='area', x='year',y='count', rot=70, color='#bc5090', legend=None, figsize=(8,6))
plt.xticks(num_of_reviews_by_year.year)
plt.xlim(1995, 2015)
plt.title('Number of reviews across years')
plt.xlabel('Year')
plt.ylabel('Number of Reviews')

The preceding commands render the plot on the attached EMR cluster. To visualize the plot within your notebook, use %matplot magic. See the following code:

%matplot plt

The following graph shows that the number of reviews provided by customers increased exponentially from 1995 to 2015. Interestingly, 2001, 2002, and 2015 are outliers, when the number of reviews dropped from the previous years.

A line chart showing number of reviews across years.

You can analyze the distribution of star ratings and visualize it using a pie chart. See the following code:

# Distribution of overall star ratings
product_ratings_dist = df.groupBy('star_rating').count().orderBy('count').toPandas()

plt.clf()
labels = [f"Star Rating: {rating}" for rating in product_ratings_dist['star_rating']]
reviews = [num_reviews for num_reviews in product_ratings_dist['count']]
colors = ['#00876c', '#89c079', '#fff392', '#fc9e5a', '#de425b']
fig, ax = plt.subplots(figsize=(8,5))
w,a,b = ax.pie(reviews, autopct='%1.1f%%', colors=colors)
plt.title('Distribution of star ratings for books')
ax.legend(w, labels, title="Star Ratings", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))

Print the pie chart using %matplot magic and visualize it from your notebook with the following code:

%matplot plt

The following pie chart shows that 80% of users gave a rating of 4 or higher. Approximately 10% of users rated their books 2 or lower. In general, customers are happy about their book purchases from Amazon.

Output shows pie chart depicting distribution of star ratings.

Lastly, use the ‘uninstall_package’ Pyspark API to uninstall the Pandas library that you installed using the install_package API. This is useful in scenarios in which you want to use a different version of a library that you previously installed using EMR Notebooks. See the following code:

sc.uninstall_package('pandas')

You get the following output:

Output shows that “pandas” package is successfully uninstalled.

Next, run the following code:

sc.list_packages()

You get the following output:

Output shows list of Python packages along with their versions.

After closing your notebook, the Pandas and Matplot libraries that you installed on the cluster using the install_pypi_package API are garbage and collected out of the cluster.

Using local Python libraries in EMR Notebooks

The notebook-scoped libraries discussed previously require your EMR cluster to have access to a PyPI repository. If you cannot connect your EMR cluster to a repository, use the Python libraries pre-packaged with EMR Notebooks to analyze and visualize your results locally within the notebook. Unlike the notebook-scoped libraries, these local libraries are only available to the Python kernel and are not available to the Spark environment on the cluster. To use these local libraries, export your results from your Spark driver on the cluster to your notebook and use the notebook magic to plot your results locally. Because you are using the notebook and not the cluster to analyze and render your plots, the dataset that you export to the notebook has to be small (recommend less than 100 MB).

To see the list of local libraries, run the following command from the notebook cell:

%%local
conda list

You get a list of all the libraries available in the notebook. Because the list is rather long, this post doesn’t include them.

For this analysis, find out the top 10 children’s books from your book reviews dataset and analyze the star rating distribution for these children’s books.

You can identify the children’s books by using customers’ written reviews with the following code:

kids_books = (
df
.where("lower(review_body) LIKE '%child%' OR lower(review_body) LIKE '%kid%' OR lower(review_body) LIKE '%infant%'OR lower(review_body) LIKE '%Baby%'")
.select("customer_id", "product_id", "star_rating", "product_title", "year")
)

Plot the top 10 children’s books by number of customer reviews with the following code:

top_10_book_titles = kids_books.groupBy('product_title') \
                       .count().orderBy('count', ascending=False) \
                       .limit(10)
top_10_book_titles.show(10, False)

You get the following output:

Output shows list of book titles with their corresponding review count.

Analyze the customer rating distribution for these books with the following code:

top_10 = kids_books.groupBy('product_title', 'star_rating') \
           .count().join(top_10_book_titles, ['product_title'], 'leftsemi') \
           .orderBy('count', ascending=False) 
top_10.show(truncate=False)

You get the following output:

Output shows list of book titles along with start ratings and review counts.

To plot these results locally within your notebook, export the data from the Spark driver and cache it in your local notebook as a Pandas DataFrame. To achieve this, first register a temporary table with the following code:

top_10.createOrReplaceTempView("top_10_kids_books")

Use the local SQL magic to extract the data from this table with the following code:

%%sql -o top_10 -n -1
SELECT product_title, star_rating, count from top_10_kids_books
GROUP BY product_title, star_rating, count
ORDER BY count Desc

For more information about these magic commands, see the GitHub repo.

After you execute the code, you get a user-interface to interactively plot your results. The following pie chart shows the distribution of ratings:

Output shows pie chart depicting distribution of star ratings.

You can also plot more complex charts by using local Matplot and seaborn libraries available with EMR Notebooks. See the following code:

%%local 
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
sns.set()
plt.clf()
top_10['book_name'] = top_10['product_title'].str.slice(0,30)
colormap = sns.color_palette("hls", 8)
pivot_df = top_10.pivot(index= 'book_name', columns='star_rating', values='count')
pivot_df.plot.barh(stacked=True, color = colormap, figsize=(15,11))
plt.title('Top 10 children books',fontsize=16)
plt.xlabel('Number of reviews',fontsize=14)
plt.ylabel('Book',fontsize=14)
plt.subplots_adjust(bottom=0.2)

You get the following output:

Output shows stacked bar chart for top 10 children books with star ratings and review counts.

Summary

This post showed how to use the notebook-scoped libraries feature of EMR Notebooks to import and install your favorite Python libraries at runtime on your EMR cluster, and use these libraries to enhance your data analysis and visualize your results in rich graphical plots. The post also demonstrated how to use the pre-packaged local Python libraries available in EMR Notebook to analyze and plot your results.

 


About the Author

Parag Chaudhari is a software development engineer at AWS.

 

 

 

Secure your Amazon EMR cluster from unintentional network exposure with Block Public Access configuration

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/secure-your-amazon-emr-cluster-from-unintentional-network-exposure-with-block-public-access-configuration/

AWS security groups act as a network firewall that allows you to control access to your cluster to only whitelisted IP addresses. Proper management of security groups rules is critical to protect your application and data on the cluster. Amazon EMR strongly recommends creating restrictive security group rules that include the necessary network ports, protocols, and IP addresses based on your application requirements.

While AWS account administrators can protect cloud network security in different ways, a new feature helps them prevent account users from launching clusters with misconfigured security group rules. Misconfiguration can open a broad range of cluster ports to unrestricted traffic from the public internet and expose cluster resources to outside threats.

This post discusses a new account level feature called Block Public Access (BPA) configuration that helps administrators enforce a common public access rule across all of their EMR clusters in a region.

Overview of Block Public Access configuration

BPA configuration is an account-level configuration that helps you centrally manage public network access to EMR clusters in a region. You can enable this configuration in a region and block your account users from launching clusters that allow unrestricted inbound traffic from the public IP address ( source set to 0.0.0.0/0 for IPv4 and ::/0 for IPv6) through its ports. Your applications may require specific ports to be open to the internet. In that case, configure these ports (or port ranges) in the BPA configuration as exceptions to allow public access before you launch clusters.

When account users launch clusters in the region where you have enabled BPA configuration, EMR will check the port rules defined in this configuration and  compare it with inbound traffic rules specified in the security groups associated with the clusters. If these security groups have inbound rules that open ports to the public IP address but you did not configure these ports as exception in BPA configuration, then EMR will fail the cluster creation and send an exception to the user.

 Enabling BPA configuration from the AWS Management Console

To enable BPA configuration, you need permission to call PutBlockPublicAccessConfiguration API.

  • Log in to the AWS Management Console. From the console, navigate to the Amazon EMR
  • From the navigation panel, choose Block Public Access.
  • Choose Change and select On to enable BPA.

By default, all ports are blocked except port 22 for SSH traffic. To allow more ports for public access, add them as exceptions.

  • Choose Add a port range.

Before launching your cluster, define these exceptions. The port number or range should be the only ones in the security group rules that have an inbound source IP address of 0.0.0.0/0 for IPv4 and ::/0 for IPv6.

  • Enter a port number or the range of ports for public access.
  • Choose Save Changes.

Block public access section with the "Change" hyperlink circled in red.

Block public access settings, under Exceptions section +Add a port range circled in red.

For information about configuring BPA using the AWS CLI, see Configure Block Public Access.

Summary

In this post ,we discussed a new account level feature on Amazon EMR called Block Public Access  (BPA) configuration that helps administrators manage public access to their EMR clusters. You can enable BPA configuration today and prevent your EMR cluster in a region from being unintentionally exposed to public network.

 


About the Author

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

Implement perimeter security in EMR using Apache Knox

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/implement-perimeter-security-in-emr-using-apache-knox/

Perimeter security helps secure Apache Hadoop cluster resources to users accessing from outside the cluster. It enables a single access point for all REST and HTTP interactions with Apache Hadoop clusters and simplifies client interaction with the cluster. For example, client applications must acquire Kerberos tickets using Kinit or SPNEGO before interacting with services on Kerberos enabled clusters. In this post, we walk through setup of Apache Knox to enable perimeter security for EMR clusters.

It provides the following benefits:

  • Simplify authentication of various Hadoop services and UIs
  • Hide service-specific URL’s/Ports by acting as a Proxy
  • Enable SSL termination at the perimeter
  • Ease management of published endpoints across multiple clusters

Overview

Apache Knox

Apache Knox provides a gateway to access Hadoop clusters using REST API endpoints. It simplifies client’s interaction with services on the Hadoop cluster by integrating with enterprise identity management solutions and hiding cluster deployment details.

In this post, we run the following setup:

  • Create a virtual private cloud (VPC) based on the Amazon VPC
  • Provision an Amazon EC2 Windows instance for Active Directory domain controller.
  • Create an Amazon EMR security configuration for Kerberos and cross-realm trust.
  • Set up Knox on EMR master node and enable LDAP authentication

Visually, we are creating the following resources:

Figure 1: Provisioned infrastructure from CloudFormation

Prerequisites and assumptions

Before getting started, the following prerequisites must be met:

IMPORTANT: The templates use hardcoded user name and passwords, and open security groups. They are not intended for production use without modification.

NOTE:

  • Single VPC has been used to simplify networking
  • CloudFormationtemplates use hardcoded user names and passwords and open security groups for simplicity.

Implementation

Single-click solution deployment

If you don’t want to set up each component individually, you can use the single-step AWS CloudFormation template. The single-step template is a master template that uses nested stacks (additional templates) to launch and configure all the resources for the solution in one operation.

To launch the entire solution, click on the Launch Stack button below that directs you to the console. Do not change to a different Region because the template is designed to work only in US-EAST-1 Region.

This template requires several parameters that you must provide. See the table below, noting the parameters marked with *, for which you have to provide values. The remaining parameters have default values and should not be edited.

For this parameterUse this
1Domain Controller NameDC1
2Active Directory domainawsknox.com
3Domain NetBIOS nameAWSKNOX (NetBIOS name of the domain (up to 15 characters).
4Domain admin userUser name for the account to be added as Domain administrator. (awsadmin)
5Domain admin password *Password for the domain admin user. Must be at least eight characters containing letters, numbers, and symbols – for example, CheckSum123
6Key pair name *Name of an existing EC2 key pair to enable access to the domain controller instance.
7Instance typeInstance type for the domain controller EC2 instance.
8LDAP Bind user nameLDAP Bind user name.
Default value is: CN=awsadmin,CN=Users,DC=awsknox,DC=com
9EMR Kerberos realmEMR Kerberos realm name. This is usually the VPC’s domain name in upper case letters Eg: EC2.INTERNAL
10Cross-realm trust password *Password for cross-realm trust Eg: CheckSum123
11Trusted Active Directory DomainThe Active Directory domain that you want to trust. This is same as Active Directory in name, but in upper case letters. Default value is “AWSKNOX.COM”
12Instance typeInstance type for the domain controller EC2 instance. Default: m4.xlarge
13Instance countNumber of core instances of EMR cluster. Default: 2
14Allowed IP addressThe client IP address that can reach your cluster. Specify an IP address range in CIDR notation (for example, 203.0.113.5/32). By default, only the VPC CIDR (10.0.0.0/16) can reach the cluster. Be sure to add your client IP range so that you can connect to the cluster using SSH.
15EMR applicationsComma-separated list of applications to install on the cluster. By default it selects “Hadoop,” “Spark,” “Ganglia,” “Hive” and “HBase”
16LDAP search baseLDAP search base: Only value is : “CN=Users,DC=awshadoop,DC=com”
17LDAP search attributeProvide LDAP user search attribute. Only value is : “sAMAccountName”
18LDAP user object classProvide LDAP user object class value. Only value is : “person”
19LDAP group search baseProvide LDAP group search base value. Only value is : “dc=awshadoop, dc=com”
20LDAP group object classProvide LDAP group object class. Only value is “group”
21LDAP member attributeProvide LDAP member attribute. Only value is : “member”
22EMRLogDir *Provide an Amazon S3 bucket where the EMRLogs are stored. Also provide “s3://” as prefix.
23S3 BucketAmazon S3 bucket where the artifacts are stored. In this case, all the artifacts are stored in “aws-bigdata-blog” public S3 bucket. Do not change this value.

Deploying each component individually

If you used the CloudFormation Template in the single-step solution, you can skip this section and start from the Access the Cluster section. This section describes how to use AWS CloudFormation templates to perform each step separately in the solution.

1.     Create and configure an Amazon VPC

In this step, we set up an Amazon VPC, a public subnet, an internet gateway, a route table, and a security group.

In order for you to establish a cross-realm trust between an Amazon EMR Kerberos realm and an Active Directory domain, your Amazon VPC must meet the following requirements:

  • The subnet used for the Amazon EMR cluster must have a CIDR block of fewer than nine digits (for example, 10.0.1.0/24).
  • Both DNS resolution and DNS hostnames must be enabled (set to “yes”).
  • The Active Directory domain controller must be the DNS server for instances in the Amazon VPC (this is configured in the next step).

To launch directly through the console, choose Launch Stack.

2.     Launch and configure an Active Directory domain controller

In this step, you use an AWS CloudFormation template to automatically launch and configure a new Active Directory domain controller and cross-realm trust.

Next, launch a windows EC2 instance and install and configure an Active Directory domain controller. In addition to launching and configuring an Active Directory domain controller and cross realm trust, this AWS CloudFormation template also sets the domain controller as the DNS server (name server) for your Amazon VPC.

To launch directly through the console, choose Launch Stack.

3.     Launch and configure EMR cluster with Apache Knox

To launch a Kerberized Amazon EMR cluster, first we must create a security configuration containing the cross-realm trust configuration. For more details on this, please refer to the blog post, Use Kerberos Authentication to integerate Amazon EMR with Microsoft Active Directory.

In addition to the steps that are described in the above blog, this adds an additional step to the EMR cluster, which creates a Kerberos principal for Knox.

The CloudFormation script also updates the below parameters in core-site.xml, hive-site.xml, hcatalog-webchat-site.xml and oozie-site.xml files. You can see these in “create_emr.py” script. Once the EMR cluster is created, it also runs a shell script as an EMR step. This shell script downloads and installs Knox software on EMR master machine. It also creates a Knox topology file with the name: emr-cluster-top.

To launch directly through the console, choose Launch Stack.

Accessing the cluster

API access to Hadoop Services

One of the main reasons to use Apache Knox is the isolate the Hadoop cluster from direct connectivity by users. Below, we demonstrate how you can interact with several Hadoop services like WebHDFS, WebHCat, Oozie, HBase, Hive, and Yarn applications going through the Knox endpoint using REST API calls. The REST calls can be called on the EMR cluster or outside of the EMR cluster. However, in a production environment, EMR cluster’s security groups should be set to only allow traffic on Knox’s port number to block traffic to all other applications.

For the purposes of this blog, we make the REST calls on the EMR cluster by SSH’ing to master node on the EMR cluster using the LDAP credentials:

ssh [email protected]<EMR-Master-Machine-Public-DNS>

Replace <EMR-Master-Machine-Public-DNS> with the value from the CloudFormation outputs to the EMR cluster’s master node. Find this CloudFormation Output value from the stack you deployed in Step 3 above.

You are prompted for the ‘awsadmin’ LDAP password. Please use the password you selected during the CloudFormation stack creation.

NOTE: In order to connect, your client machine’s IP should fall within the CIDR range specified by “Allowed IP address in the CloudFormation parameters. If you are not able to connect to the master node, check the master instance’s security group for the EMR cluster has a rule to allow traffic from your client. Otherwise, your organizations firewall may be blocking your traffic.

Demonstrating access to the WebHDFS service API:

Here we will invoke the LISTSTATUS operation on WebHDFS via the knox gateway. In our setup, knox is running on port number 8449. The below command will return a directory listing of the root directory of HDFS.

curl -ku awsadmin 'https://localhost:8449/gateway/emr-cluster-top/webhdfs/v1/?op=LISTSTATUS'

You can use both “localhost” or the private DNS of the EMR master node.

You are prompted for the password. This is the same “Domain admin password” that was passed as the parameter into the CloudFormation stack.

Demonstrating access Resource Manager service API:

The Resource manager REST API provides information about the Hadoop cluster status, applications that are running on the cluster etc. We can use the below command to get the cluster information.

curl -ikv -u awsadmin -X GET 'https://localhost:8449/gateway/emr-cluster-top/resourcemanager/v1/cluster'

You are prompted for the password. This is the same “Domain admin password” that was passed as the parameter into the CloudFormation stack.

Demonstrating connecting to Hive using Beeline through Apache Knox:

We can use Beeline, a JDBC client tool to connect to HiveServer2. Here we will connect to Beeline via Knox.

Use the following command to connect to hive shell

$hive

Use the following syntax to connect to Hive from beeline

!connect jdbc:hive2://<EMR-Master-Machine-Public-DNS>:8449/;transportMode=http;httpPath=gateway/emr-cluster-top/hive;ssl=true;sslTrustStore=/home/knox/knox/data/security/keystores/gateway.jks;trustStorePassword=CheckSum123

NOTE: You must update the <EMR-Master-Machine-Public-DNS> with the public DNS name of the EMR master node.

Demonstrating submitting an Spark job using Apache Livy through Apache Knox

You can use the following command to submit a spark job to an EMR cluster. In this example, we run SparkPi program that is available in spark-examples.jar.

curl -i -k -u awsadmin -X POST --data '{"file": "s3://aws-bigdata-blog/artifacts/aws-blog-emr-knox/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" https://localhost:8449/gateway/emr-cluster-top/livy/v1/batches

You can use both “localhost” or the private DNS of EMR master node.

Securely accessing Hadoop Web UIs

In addition to providing API access to Hadoop clusters, Knox also provides proxying service for Hadoop UIs. Below is a table of available UIs:

Application NameApplication URL
1Resource Managerhttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/yarn/
2Gangliahttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/ganglia/
3Apache HBasehttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/hbase/webui/master-status
4WebHDFShttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/hdfs/
5Spark Historyhttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/sparkhistory/

On the first visit of any UI above, you are presented with a drop-down for login credentials. Enter the login user awsadmin and the password you specified as a parameter to your CloudFormation template.

You can now browse the UI as you were directly connected to the cluster. Below is a sample of the Yarn UI:

And the scheduler information in the Yarn UI:

Ganglia:

Spark History UI:

Lastly, HBase UI. The entire URL to the “master-status” page must be provided

Troubleshooting

It’s always clear when there is an error interacting with Apache Knox. Below are a few troubleshooting steps.

I cannot connect to the UI. I do not get any error codes.

  • Apache Knox may not be running. Check that its running by logging into the master node of your cluster and running “ps -ef | grep knox”. There should be a process running.
ps -ef | grep knox
Knox 114022 1 0 Aug24 ? 00:04:21 /usr/lib/jvm/java/bin/java -Djava.library.path=/home/knox/knox/ext/native -jar /home/knox/knox/bin/gateway.jar

If the process is not running, start the process by running “/home/knox/knox/bin/gateway.sh start” as the Knox user (sudo su – knox).

  • Your browser may not have connectivity to the cluster. Even though you may be able to SSH to the cluster, a firewall rule or security group rule may be preventing traffic on the port number that Knox is running on. You can route traffic through SSH by building an SSH tunnel and enable port forwarding.

I get an HTTP 400, 404 or 503 code when accessing a UI:

  • Ensure that the URL you are entering is correct. If you do not enter the correct path, then Knox provides an HTTP 404.
  • There is an issue with the routing rules within Apache Knox and it does not know how to route the requests. The logs for Knox are at INFO level by default and is available in /home/knox/knox/logs/. If you want to change the logging level, change the following lines in /home/knox/knox/conf/gateway-log4j.properties:log4j.logger.org.apache.knox.gateway=INFO
    #log4j.logger.org.apache.knox.gateway=DEBUGto#log4j.logger.org.apache.knox.gateway=INFO
    log4j.logger.org.apache.knox.gateway=DEBUGThe logs will provide a lot more information such as how Knox is rewriting URL’s. This could provide insight whether Knox is translating URL’s correctly.You can use the below “ldap”, “knoxcli” and “curl” commands to verify that the setup is correct. Run these commands as “knox” user.
  • To verify search base, search attribute and search class, run the below ldap command
    ldapsearch -h <Active-Directory-Domain-Private-IP-Address> -p 389 -x -D 'CN=awsadmin,CN=Users,DC=awsknox,DC=com' -w 'CheckSum123' -b 'CN=Users,DC=awsknox,DC=com' -z 5 '(objectClass=person)' sAMAccountName

  • Replace “<Active-Directory-Domain-Private-IP-Address>” with the private IP address of the Active Directory EC2 instance. You can get this IP address from the output of second CloudFormation template.
  • To verify the values for server host, port, username, and password, run the below ldap command.
    ldapwhoami -h <Active-Directory-Domain-Private-IP-Address> -p 389 -x -D 'CN=awsadmin,CN=Users,DC=awsknox,DC=com' -w 'CheckSum123'

  • Replace “<Active-Directory-Domain-Private-IP-Address>” with the private IP address of the Active Directory EC2 instance. You can get this IP address from the output of second CloudFormation template.
  • It should display the below output:

  • To verify the System LDAP bind successful or not:
    /home/knox/knox/bin/knoxcli.sh user-auth-test --cluster emr-cluster-top --u awsadmin --p 'CheckSum123'

  • Here “emr-cluster-top” is the topology file that defines the applications that are available and the endpoints that Knox should connect to service the application.
  • The output from the command should return the below output:

“System LDAP Bind successful!”

  • To verify LDAP authentication successful or not, run the below command.
    /home/knox/knox/bin/knoxcli.sh user-auth-test --cluster emr-cluster-top --u awsadmin --p 'CheckSum123'

  • Here “emr-cluster-top” is the topology file name that we created.
  • The output the command should return the below output:

“LDAP authentication successful!”

  • Verify if WebHDFS is reachable directly using the service
  • First, we must get a valid Kerberos TGT, for that we must use the kinit command as below:
    kinit -kt /mnt/var/lib/bigtop_keytabs/knox.keytab knox/<EMR-Master-Machine-Private-DNS>@EC2.INTERNAL
    curl --negotiate -u : http://<EMR-Master-Machine-Private-DNS>:50070/webhdfs/v1/?op=GETHOMEDIRECTORY

  • For example: EMR-Master-Machine-Private-DNS appears in this format: ip-xx-xx-xx-xx.ec2.internal
  • It should return a JSON object containing a “Path” variable of the user’s home directory.

Cleanup

Delete the CloudFormation stack to clean up all the resources created for this setup. If you used the nested stack, CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup, configuration, and validation of Perimeter security for EMR clusters using Apache Knox. This helps simplify Authentication for various Hadoop services. In our next post, we will show you how to integrate Apache Knox and Apache Ranger to enable authorization and audits.

Stay tuned!

 


Related

 


About the Author


Varun Rao is a enterprise solutions architect
. He works with enterprise customers in their journey to the cloud with focus of data strategy and security. In his spare time, he tries to keep up with his 4-year old.

 

 

 

Mert Hocanin is a big data architect with AWS, covering several products, including EMR, Athena and Managed Blockchain. Prior to working in AWS, he has worked on Amazon.com’s retail business as a Senior Software Development Engineer, building a data lake to process vast amounts of data from all over the company for reporting purposes. When not building and designing data lakes, Mert enjoys traveling and food.

 

 

 

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

 

Run Spark applications with Docker using Amazon EMR 6.0.0 (Beta)

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/run-spark-applications-with-docker-using-amazon-emr-6-0-0-beta/

The Amazon EMR team is excited to announce the public beta release of EMR 6.0.0 with Spark 2.4.3, Hadoop 3.1.0, Amazon Linux 2, and Amazon Corretto 8. With this beta release, Spark users can use Docker images from Docker Hub and Amazon Elastic Container Registry (Amazon ECR) to define environment and library dependencies. Using Docker, users can easily define their dependencies and use them for individual jobs, avoiding the need to install dependencies on individual cluster hosts.

This post shows you how to use Docker with the EMR release 6.0.0 Beta. You’ll learn how to launch an EMR release 6.0.0 Beta cluster and run Spark jobs using Docker containers from both Docker Hub and Amazon ECR.

Hadoop 3 Docker support

EMR 6.0.0 (Beta) includes Hadoop 3.1.0, which allows the YARN NodeManager to launch containers either directly on the host machine of the cluster, or inside a Docker container. Docker containers provide a custom execution environment in which the application’s code runs isolated from the execution environment of the YARN NodeManager and other applications.

These containers can include special libraries needed by the application, and even provide different versions of native tools and libraries such as R, Python, Python libraries. This allows you to easily define the libraries and runtime dependencies that your applications need, using familiar Docker tooling.

Clusters running the EMR 6.0.0 (Beta) release are configured by default to allow YARN applications such as Spark to run using Docker containers. To customize this, use the configuration for Docker support defined in the yarn-site.xml and container-executor.cfg files available in the /etc/hadoop/conf directory. For details on each configuration option and how it is used, see Launching Applications Using Docker Containers.

You can choose to use Docker when submitting a job. On job submission, the following variables are used to specify the Docker runtime and Docker image used:

    • YARN_CONTAINER_RUNTIME_TYPE=docker
    • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={DOCKER_IMAGE_NAME}

When you use Docker containers to execute your YARN applications, YARN downloads the Docker image specified when you submit your job. For YARN to resolve this Docker image, it must be configured with a Docker registry. Options to configure a Docker registry differ based on how you chose to deploy EMR (using either a public or private subnet).

Docker registries

A Docker registry is a storage and distribution system for Docker images. For EMR 6.0.0 (Beta), the following Docker registries can be configured:

  • Docker Hub: A public Docker registry containing over 100,000 popular Docker images.
  • Amazon ECR: A fully-managed Docker container registry that allows you to create your own custom images and host them in a highly available and scalable architecture.

Deployment considerations

Docker registries require network access from each host in the cluster, as each host downloads images from the Docker registry when your YARN application is running on the cluster. How you choose to deploy your EMR cluster (launching it into a public or private subnet) may limit your choice of Docker registry due to network connectivity requirements.

Public subnet

With EMR public subnet clusters, nodes running YARN NodeManager can directly access any registry available over the internet, such as Docker Hub, as shown in the following diagram.

Private Subnet

With EMR private subnet clusters, nodes running YARN NodeManager don’t have direct access to the internet.  Docker images can be hosted in the ECR and accessed through AWS PrivateLink, as shown in the following diagram.

For details on how to use AWS PrivateLink to allow access to ECR in a private subnet scenario, see Setting up AWS PrivateLink for Amazon ECS, and Amazon ECR.

Configuring Docker registries

Docker must be configured to trust the specific registry used to resolve Docker images. The default trust registries are local (private) and centos (on public Docker Hub). You can override docker.trusted.registries in /etc/hadoop/conf/container-executor.cfg to use other public repositories or ECR. To override this configuration, use the EMR Classification API with the container-executor classification key.

The following example shows how to configure the cluster to trust both a public repository (your-public-repo) and an ECR registry (123456789123.dkr.ecr.us-east-1.amazonaws.com). When using ECR, replace this endpoint with your specific ECR endpoint.  When using Docker Hub, please replace this repository name with your actual repository name.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos, your-public-repo,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                "docker.privileged-containers.registries": "local,centos, your-public-repo,123456789123.dkr.ecr.us-east-1.amazonaws.com"
            }
        }
    ]
  }
]

To launch an EMR 6.0.0 (Beta) cluster with this configuration using the AWS Command Line Interface (AWS CLI), create a file named container-executor.json with the contents of the preceding JSON configuration.  Then, use the following commands to launch the cluster:

$ export KEYPAIR=<Name of your Amazon EC2 key-pair>
$ export SUBNET_ID=<ID of the subnet to which to deploy the cluster>
$ export INSTANCE_TYPE=<Name of the instance type to use>
$ export REGION=<Region to which to deploy the cluster deployed>

$ aws emr create-cluster \
    --name "EMR-6-Beta Cluster" \
    --region $REGION \
    --release-label emr-6.0.0-beta \
    --applications Name=Hadoop Name=Spark \
    --service-role EMR_DefaultRole \
    --ec2-attributes KeyName=$KEYPAIR,InstanceProfile=EMR_EC2_DefaultRole,SubnetId=$SUBNET_ID \
    --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=$INSTANCE_TYPE InstanceGroupType=CORE,InstanceCount=2,InstanceType=$INSTANCE_TYPE \
    --configuration file://container-executor.json

Using ECR

If you’re new to ECR, follow the instructions in Getting Started with Amazon ECR and verify you have access to ECR from each instance in your EMR cluster.

To access ECR using the docker command, you must first generate credentials. To make sure that YARN can access images from ECR, pass a reference to those generated credentials using the container environment variable YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG.

Run the following command on one of the core nodes to get the login line for your ECR account.

$ aws ecr get-login --region us-east-1 --no-include-email

The get-login command generates the correct Docker CLI command to run to create credentials. Copy and run the output from get-login.

$ sudo docker login -u AWS -p <password> https://<account-id>.dkr.ecr.us-east-1.amazonaws.com

This command generates a config.json file in the /root/.docker folder.  Copy this file to HDFS so that jobs submitted to the cluster can use it to authenticate to ECR.

Execute the commands below to copy the config.json file to your home directory.

$ mkdir -p ~/.docker
$ sudo cp /root/.docker/config.json ~/.docker/config.json
$ sudo chmod 644 ~/.docker/config.json

Execute the commands below to put the config.json in HDFS so it may be used by jobs running on the cluster.

$ hadoop fs -put ~/.docker/config.json /user/hadoop/

At this point, YARN can access ECR as a Docker image registry and pull containers during job execution.

Using Spark with Docker

With EMR 6.0.0 (Beta), Spark applications can use Docker containers to define their library dependencies, instead of requiring dependencies to be installed on the individual Amazon EC2 instances in the cluster. This integration requires configuration of the Docker registry, and definition of additional parameters when submitting a Spark application.

When the application is submitted, YARN invokes Docker to pull the specified Docker image and run the Spark application inside of a Docker container. This allows you to easily define and isolate dependencies. It reduces the time spent bootstrapping or preparing instances in the EMR cluster with the libraries needed for job execution.

When using Spark with Docker, make sure that you consider the following:

  • The docker package and CLI are only installed on core and task nodes.
  • The spark-submit command should always be run from a master instance on the EMR cluster.
  • The Docker registries used to resolve Docker images must be defined using the Classification API with the container-executor classification key to define additional parameters when launching the cluster:
    • docker.trusted.registries
    • docker.privileged-containers.registries
  • To execute a Spark application in a Docker container, the following configuration options are necessary:
    • YARN_CONTAINER_RUNTIME_TYPE=docker
    • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={DOCKER_IMAGE_NAME}
  • When using ECR to retrieve Docker images, you must configure the cluster to authenticate itself. To do so, you must use the following configuration option:
    • YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={DOCKER_CLIENT_CONFIG_PATH_ON_HDFS}
  • Mount the /etc/passwd file into the container so that the user running the job can be identified in the Docker container.
    • YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro
  • Any Docker image used with Spark must have Java installed in the Docker image.

Creating a Docker image

Docker images are created using a Dockerfile, which defines the packages and configuration to include in the image.  The following two example Dockerfiles use PySpark and SparkR.

PySpark Dockerfile

Docker images created from this Dockerfile include Python 3 and the numpy Python package.  This Dockerfile uses Amazon Linux 2 and the Amazon Corretto JDK 8.

FROM amazoncorretto:8

RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development

RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

RUN python -V
RUN python3 -V

ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip
RUN pip3 install numpy panda

RUN python3 -c "import numpy as np"

SparkR Dockerfile

Docker images created from this Dockerfile include R and the randomForest CRAN package. This Dockerfile includes Amazon Linux 2 and the Amazon Corretto JDK 8.

FROM amazoncorretto:8

RUN java -version

RUN yum -y update
RUN amazon-linux-extras enable R3.4

RUN yum -y install R R-devel openssl-devel
RUN yum -y install curl

#setup R configs
RUN echo "r <- getOption('repos'); r['CRAN'] <- 'http://cran.us.r-project.org'; options(repos = r);" > ~/.Rprofile

RUN Rscript -e "install.packages('randomForest')"

For more information on Dockerfile syntax, see the Dockerfile reference documentation.

Using Docker images from ECR

Amazon Elastic Container Registry (ECR) is a fully-managed Docker container registry that makes it easy for developers to store, manage, and deploy Docker container images. When using ECR, the cluster must be configured to trust your instance of ECR, and you must configure authentication in order for the cluster to use Docker images from ECR.

In this example, our cluster must be created with the following additional configuration, to ensure the ECR registry is trusted. Please replace the 123456789123.dkr.ecr.us-east-1.amazonaws.com endpoint with your ECR endpoint.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                "docker.privileged-containers.registries": "local,centos, 123456789123.dkr.ecr.us-east-1.amazonaws.com"
            }
        }
    ]
  }
]

Using PySpark with ECR

This example uses the PySpark Dockerfile.  It will be tagged and upload to ECR. Once uploaded, you will run the PySpark job and reference the Docker image from ECR.

After you launch the cluster, use SSH to connect to a core node and run the following commands to build the local Docker image from the PySpark Dockerfile example.

First, create a directory and a Dockerfile for our example.

$ mkdir pyspark

$ vi pyspark/Dockerfile

Paste the contents of the PySpark Dockerfile and run the following commands to build a Docker image.

$ sudo docker build -t local/pyspark-example pyspark/

Create the emr-docker-examples ECR repository for our examples.

$ aws ecr create-repository --repository-name emr-docker-examples

Tag and upload the locally built image to ECR, replacing 123456789123.dkr.ecr.us-east-1.amazonaws.com with your ECR endpoint.

$ sudo docker tag local/pyspark-example 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example
$ sudo docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example

Use SSH to connect to the master node and prepare a Python script with the filename main.py. Paste the following content into the main.py file and save it.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
sc = spark.sparkContext

import numpy as np
a = np.arange(15).reshape(3, 5)
print(a)

To submit the job, reference the name of the Docker. Define the additional configuration parameters to make sure that the job execution uses Docker as the runtime. When using ECR, the YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG must reference the config.json file containing the credentials used to authenticate to ECR.

$ DOCKER_IMAGE_NAME=123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example
$ DOCKER_CLIENT_CONFIG=hdfs:///user/hadoop/config.json
$ spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--num-executors 2 \
main.py -v

When the job has completed, take note of the YARN application ID, and use the following command to obtain the output of the PySpark job.

$ yarn logs --applicationId application_id | grep -C2 '\[\['
LogLength:55
LogContents:
[[ 0  1  2  3  4]
 [ 5  6  7  8  9]
 [10 11 12 13 14]]

Using SparkR with ECR

This example uses the SparkR Dockerfile. It will be tagged and upload to ECR. Once uploaded, you will run the SparkR job and reference the Docker image from ECR.

After you launch the cluster, use SSH to connect to a core node and run the following commands to build the local Docker image from the SparkR Dockerfile example.

First, create a directory and the Dockerfile for this example.

$ mkdir sparkr

$ vi sparkr/Dockerfile

Paste the contents of the SparkR Dockerfile and run the following commands to build a Docker image.

$ sudo docker build -t local/sparkr-example sparkr/

Tag and upload the locally built image to ECR, replacing 123456789123.dkr.ecr.us-east-1.amazonaws.com with your ECR endpoint.

$ sudo docker tag local/sparkr-example 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example
$ sudo docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example

Use SSH to connect to the master node and prepare an R script with name sparkR.R. Paste the following contents into the sparkR.R file.

library(SparkR)
sparkR.session(appName = "R with Spark example", sparkConfig = list(spark.some.config.option = "some-value"))

sqlContext <- sparkRSQL.init(spark.sparkContext)
library(randomForest)
# check release notes of randomForest
rfNews()

sparkR.session.stop()

To submit the job, reference the name of the Docker. Define the additional configuration parameters to make sure that the job execution uses Docker as the runtime. When using ECR, the YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG must reference the config.json file containing the credentials used to authenticate to ECR.

$ DOCKER_IMAGE_NAME=123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example
$ DOCKER_CLIENT_CONFIG=hdfs:///user/hadoop/config.json
$ spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
sparkR.R

When the job has completed, note the YARN application ID, and use the following command to obtain the output of the SparkR job. This example includes testing to make sure that the randomForest library, version installed, and release notes are available.

$ yarn logs --applicationId application_id | grep -B4 -A10 "Type rfNews"
randomForest 4.6-14
Type rfNews() to see new features/changes/bug fixes.
Wishlist (formerly TODO):

* Implement the new scheme of handling classwt in classification.

* Use more compact storage of proximity matrix.

* Allow case weights by using the weights in sampling?

========================================================================
Changes in 4.6-14:

Using a Docker image from Docker Hub

To use Docker Hub, you must deploy your cluster to a public subnet, and configure it to use Docker Hub as a trusted registry. In this example, the cluster needs the following additional configuration to to make sure that the your-public-repo repository on Docker Hub is trusted. When using Docker Hub, please replace this repository name with your actual repository.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos,your-public-repo ",
                "docker.privileged-containers.registries": "local,centos,your-public-repo"
            }
        }
    ]
  }
]

Beta limitations

EMR 6.0.0 (Beta) focuses on helping you get value from using Docker with Spark to simplify dependency management. You can also use EMR 6.0.0 (Beta) to get familiar with Amazon Linux 2, and Amazon Corretto JDK 8.

The EMR 6.0.0 (Beta) supports the following applications:

  • Spark 2.4.3
  • Livy 0.6.0
  • ZooKeeper 3.4.14
  • Hadoop 3.1.0

This beta release is supported in the following Regions:

  • US East (N. Virginia)
  • US West (Oregon)

The following EMR features are currently not available with this beta release:

  • Cluster integration with AWS Lake Formation
  • Native encryption of Amazon EBS volumes attached to an EMR cluster

Conclusion

In this post, you learned how to use an EMR 6.0.0 (Beta) cluster to run Spark jobs in Docker containers and integrate with both Docker Hub and ECR. You’ve seen examples of both PySpark and SparkR Dockerfiles.

The EMR team looks forward to hearing about how you’ve used this integration to simplify dependency management in your projects. If you have questions or suggestions, feel free to leave a comment.


About the Authors

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Ajay Jadhav is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Rentao Wu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Stephen Wu is a software development engineer for EMR at Amazon Web Services.