All posts by Michael Sambol

Provisioning the Intuit Data Lake with Amazon EMR, Amazon SageMaker, and AWS Service Catalog

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/provisioning-the-intuit-data-lake-with-amazon-emr-amazon-sagemaker-and-aws-service-catalog/

This post shares Intuit’s learnings and recommendations for running a data lake on AWS. The Intuit Data Lake is built and operated by numerous teams in Intuit Data Platform. Thanks to Tristan Baker (Chief Architect), Neil Lamka (Principal Product Manager), Achal Kumar (Development Manager), Nicholas Audo, and Jimmy Armitage for their feedback and support.

A data lake is a centralized repository for storing structured and unstructured data at any scale. At Intuit, creating such a pile of raw data is easy. However, more interesting challenges present themselves:

  1. How should AWS accounts be organized?
  2. What ingestion methods will be used? How will analysts find the data they need?
  3. Where should data be stored? How should access be managed?
  4. What security measures are needed to protect Intuit’s sensitive data?
  5. Which parts of this ecosystem can be automated?

This post outlines the approach taken by Intuit, though it is important to remember that there are many ways to build a data lake (for example, AWS Lake Formation).

We’ll cover the technologies and processes involved in creating the Intuit Data Lake at a high level, including the overall structure and the automation used in provisioning accounts and resources. Watch this space in the future for more detailed blog posts on specific aspects of the system, from the other teams and engineers who worked together to build the Intuit Data Lake.

Architecture

Account Structure

Data lakes typically follow a hub-and-spoke model, with the hub account containing shared services that control access to data sources. For the purposes of this post, we’ll refer to the hub account as Central Data Lake.

In this pattern, access to Central Data Lake is apportioned to spoke accounts called Processing Accounts. This model maintains separation between end users and allows for division of billing among distinct business units.

 

 

It is common to maintain two ecosystems: pre-production (Pre-Prod) and production (Prod). This allows data lake administrators to silo access to data by preventing connectivity between Pre-Prod and Prod.

To enable experimentation and testing, it may also be advisable to maintain separate VPC-based environments within Pre-Prod accounts, such as dev, qa, and e2e. Processing Account VPCs would then be connected to the corresponding VPC in Central Data Lake.

Note that at first, we connected accounts via VPC Peering. However, as we scaled we quickly approached the hard limit of 125 VPC peering connections, requiring us to migrate to AWS Transit Gateway. As of this writing, we connect multiple new Processing Accounts weekly.

 

 

Central Data Lake

There may be numerous services running in a hub account, but we’ll focus on the aspects that are most relevant to this blog: ingestion, sanitization, storage, and a data catalog.

 

 

Ingestion, Sanitization, and Storage

A key component to Central Data Lake is a uniform ingestion pattern for streaming data. One example is an Apache Kafka cluster running on Amazon EC2. (You can read about how Intuit engineers do this in another AWS blog.) As we deal with hundreds of data sources, we’ve enabled access to ingestion mechanisms via AWS PrivateLink.

Note: Amazon Managed Streaming for Apache Kafka (Amazon MSK) is an alternative for running Apache Kafka on Amazon EC2, but was not available at the start of Intuit’s migration.

In addition to stream processing, another method of ingestion is batch processing, such as jobs running on Amazon EMR. After data is ingested by one of these methods, it can be stored in Amazon S3 for further processing and analysis.

Intuit deals with a large volume of customer data, and each field is carefully considered and classified with a sensitivity level. All sensitive data that enters the lake is encrypted at the source. The ingestion systems retrieve the encrypted data and move it into the lake. Before it is written to S3, the data is sanitized by a proprietary RESTful service. Analysts and engineers operating within the data lake consume this masked data.

Data Catalog

A data catalog is a common way to give end users information about the data and where it lives. One example is a Hive Metastore backed by Amazon Aurora. Another alternative is the AWS Glue Data Catalog.

Processing Accounts

When Processing Accounts are delivered to end users, they include an identical set of resources. We’ll discuss the automation of Processing Accounts below, but the primary components are as follows:

 

 

                           Processing Account structure upon delivery to the customer

 

Data Storage Mechanisms

One reasonable question is whether all data should reside in Central Data Lake, or if it’s acceptable to distribute data across multiple accounts. A data lake might employ a combination of the two approaches, and classify data locations as primary or secondary.

The primary location for data is Central Data Lake, and it arrives there via the ingestion pipelines discussed previously. Processing Accounts can read from the primary source, either directly from the ingestion pipelines or from S3. Processing Accounts can contribute their transformed data back into Central Data Lake (primary), or store it in their own accounts (secondary). The proper storage location depends on the type of data, and who needs to consume it.

One rule worth enforcing is that no cross-account writes should be permitted. In other words, the IAM principal (in most cases, an IAM role assumed by EC2 via an instance profile) must be in the same account as the destination S3 bucket. This is because cross-account delegation is not supported—specifically, S3 bucket policies in Central Data Lake cannot grant Processing Account A access to objects written by a role in Processing Account B.

Another possibility is for EMR to assume different IAM roles via a custom credentials provider (see this AWS blog), but we chose not to go down this path at Intuit because it would have required many EMR jobs to be rewritten.

 

 

Data Access Patterns

The majority of end users are interested in the data that resides in S3. In Central Data Lake and some Processing Accounts, there may be a set of read-only S3 buckets: any account in the data lake ecosystem can read data from this type of bucket.

To facilitate management of S3 access for read-only buckets, we built a mechanism to control S3 bucket policies, administered entirely via code. Our deployment pipelines use account metadata to dynamically generate the correct S3 bucket policy based on the type of account (Pre-Prod or Prod). These policies are committed back into our code repository for auditability and ease of management.

We employ the same method for managing KMS key policies, as we use KMS with customer managed customer master keys (CMKs) for at-rest encryption in S3.

Here’s an example of a generated S3 bucket policy for a read-only bucket:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ProcessingAccountReadOnly",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::111111111111:root",
                    "arn:aws:iam::222222222222:root",
                    "arn:aws:iam::333333333333:root",
                    "arn:aws:iam::444444444444:root",
                    "arn:aws:iam::555555555555:root",
                    ...
                    ...
                    ...
                    "arn:aws:iam::999999999999:root",
                ]
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::intuit-data-lake-example/*",
                "arn:aws:s3:::intuit-data-lake-example"
            ]
        }
    ]
}

Note that we grant access at the account level, rather than using explicit IAM principal ARNs. Because the reads are cross-account, permissions are also required on the IAM principals in Processing Accounts. Maintaining these policies—with automation, at that level of granularity—is untenable at scale. Furthermore, using specific IAM principal ARNs would create an external dependency on foreign accounts. For example, if a Processing Account deletes an IAM role that is referenced in an S3 bucket policy in Central Data Lake, the bucket policy can no longer be saved, causing interruptions to deployment pipelines.

Security

Security is mission critical for any data lake. We’ll mention a subset of the controls we use, but not dive deep.

Encryption

Encryption can be enforced both in transit and at rest, using multiple methods:

  1. Traffic within the lake should use the latest version of TLS (1.2 as of this writing)
  2. Data can be encrypted with application-level (client-side) encryption
  3. KMS keys can used for at-rest encryption of S3, EBS, and RDS

Ingress and Egress

There’s nothing out of the ordinary in our approach to ingress and egress, but it’s worth mentioning the standard patterns we’ve found important:

Policies restricting ingress and egress are the primary points at which a data lake can guarantee quality (ingress) and prevent loss (egress).

Authorization

Access to the Intuit Data Lake is controlled via IAM roles, meaning no IAM users (with long-term credentials) are created. End users are granted access via an internal service that manages role-based, federated access to AWS accounts. Regular reviews are conducted to remove nonessential users.

Configuration Management

We use an internal fork of Cloud Custodian, which is a suite of preventative, detective, and responsive controls consisting of Amazon CloudWatch Events and AWS Config rules. Some of the violations it reports and (optionally) mitigates include:

  • Unauthorized CIDRs in inbound security group rules
  • Public S3 bucket policies and ACLs
  • IAM user console access
  • Unencrypted S3 buckets, EBS volumes, and RDS instances

Lastly, Amazon GuardDuty is enabled in all Intuit Data Lake accounts and is monitored by Intuit Security.

Automation

If there is one thing we’ve learned building the Intuit Data Lake, it is to automate everything.

There are four areas of automation we’ll discuss in this blog:

  1. Creation of Processing Accounts
  2. Processing Account Orchestration Pipeline
  3. Processing Account Terraform Pipeline
  4. EMR and SageMaker deployment via Service Catalog

Creation of Processing Accounts

The first step in creating a Processing Account is to make a request through an internal tool. This triggers automation that provisions an Intuit-stamped AWS account under the correct business unit.

 

Note: AWS Control Tower’s Account Factory was not available at the start of our journey, but it can be leveraged to provision new AWS accounts in a secured, best practice, self-service way.

Account setup also includes automated VPC creation (with optional VPN), fully automated using Service Catalog. End users simply specify subnet sizes.

It’s worth noting that Intuit leverages Service Catalog for self-service deployment of other common patterns, including ingress security groups, VPC endpoints, and VPC peering. Here’s an example portfolio:

Processing Account Orchestration Pipeline

After account creation and VPC provisioning, the Processing Account Orchestration Pipeline runs. This pipeline executes one-time tasks required for Processing Accounts. These tasks include:

  • Bootstrapping an IAM role for use in further configuration management
  • Creation of KMS keys for S3, EBS, and RDS encryption
  • Creation of variable files for the new account
  • Updating the master configuration file with account metadata
  • Generation of scripts to orchestrate the Terraform pipeline discussed below
  • Sharing Transit Gateways via Resource Access Manager

Processing Account Terraform Pipeline

This pipeline manages the lifecycle of dynamic, frequently-updated resources, including IAM roles, S3 buckets and bucket policies, KMS key policies, security groups, NACLs, and bastion hosts.

There is one pipeline for every Processing Account, and each pipeline deploys a series of layers into the account, using a set of parameterized deployment jobs. A layer is a logical grouping of Terraform modules and AWS resources, providing a way to shrink Terraform state files and reduce blast radius if redeployment of specific resources is required.

EMR and SageMaker Deployment via Service Catalog

AWS Service Catalog facilitates the provisioning of Amazon EMR and Amazon SageMaker, allowing end users to launch EMR clusters and SageMaker notebook instances that work out of the box, with embedded security.

Service Catalog allows data scientists and data engineers to launch EMR clusters in a self-service fashion with user-friendly parameters, and provides them with the following:

  • Bootstrap action to enable connectivity to services in Central Data Lake
  • EC2 instance profile to control S3, KMS, and other granular permissions
  • Security configuration that enables at-rest and in-transit encryption
  • Configuration classifications for optimal EMR performance
  • Encrypted AMI with monitoring and logging enabled
  • Custom Kerberos connection to LDAP

For SageMaker, we use Service Catalog to launch notebook instances with custom lifecycle configurations that set up connections or initialize the following: Hive Metastore, Kerberos, security, Splunk logging, and OpenDNS. You can read more about lifecycle configurations in this AWS blog. Launching a SageMaker notebook instance with best-practice configuration is as easy as follows:

 

 

Conclusion

This post illustrates the building blocks we used in creating the Intuit Data Lake. Our solution isn’t wholly unique, but comprised of common-sense approaches we’ve gleaned from dozens of engineers across Intuit, representing decades of experience. These practices have enabled us to push petabytes of data into the lake, and serve hundreds of Processing Accounts with varying needs. We are still building, but we hope our story helps you in your data lake journey.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Ben Covi is a staff software engineer at Intuit. At any given moment, he’s probably losing a game of Catan.

 

 

 

Trigger cross-region replication of pre-existing objects using Amazon S3 inventory, Amazon EMR, and Amazon Athena

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.

Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.