Tag Archives: Data Pipeline

Use Macie to discover sensitive data as part of automated data pipelines

Post Syndicated from Brandon Wu original https://aws.amazon.com/blogs/security/use-macie-to-discover-sensitive-data-as-part-of-automated-data-pipelines/

Data is a crucial part of every business and is used for strategic decision making at all levels of an organization. To extract value from their data more quickly, Amazon Web Services (AWS) customers are building automated data pipelines—from data ingestion to transformation and analytics. As part of this process, my customers often ask how to prevent sensitive data, such as personally identifiable information, from being ingested into data lakes when it’s not needed. They highlight that this challenge is compounded when ingesting unstructured data—such as files from process reporting, text files from chat transcripts, and emails. They also mention that identifying sensitive data inadvertently stored in structured data fields—such as in a comment field stored in a database—is also a challenge.

In this post, I show you how to integrate Amazon Macie as part of the data ingestion step in your data pipeline. This solution provides an additional checkpoint that sensitive data has been appropriately redacted or tokenized prior to ingestion. Macie is a fully managed data security and privacy service that uses machine learning and pattern matching to discover sensitive data in AWS.

When Macie discovers sensitive data, the solution notifies an administrator to review the data and decide whether to allow the data pipeline to continue ingesting the objects. If allowed, the objects will be tagged with an Amazon Simple Storage Service (Amazon S3) object tag to identify that sensitive data was found in the object before progressing to the next stage of the pipeline.

This combination of automation and manual review helps reduce the risk that sensitive data—such as personally identifiable information—will be ingested into a data lake. This solution can be extended to fit your use case and workflows. For example, you can define custom data identifiers as part of your scans, add additional validation steps, create Macie suppression rules to archive findings automatically, or only request manual approvals for findings that meet certain criteria (such as high severity findings).

Solution overview

Many of my customers are building serverless data lakes with Amazon S3 as the primary data store. Their data pipelines commonly use different S3 buckets at each stage of the pipeline. I refer to the S3 bucket for the first stage of ingestion as the raw data bucket. A typical pipeline might have separate buckets for raw, curated, and processed data representing different stages as part of their data analytics pipeline.

Typically, customers will perform validation and clean their data before moving it to a raw data zone. This solution adds validation steps to that pipeline after preliminary quality checks and data cleaning is performed, noted in blue (in layer 3) of Figure 1. The layers outlined in the pipeline are:

  1. Ingestion – Brings data into the data lake.
  2. Storage – Provides durable, scalable, and secure components to store the data—typically using S3 buckets.
  3. Processing – Transforms data into a consumable state through data validation, cleanup, normalization, transformation, and enrichment. This processing layer is where the additional validation steps are added to identify instances of sensitive data that haven’t been appropriately redacted or tokenized prior to consumption.
  4. Consumption – Provides tools to gain insights from the data in the data lake.

 

Figure 1: Data pipeline with sensitive data scan

Figure 1: Data pipeline with sensitive data scan

The application runs on a scheduled basis (four times a day, every 6 hours by default) to process data that is added to the raw data S3 bucket. You can customize the application to perform a sensitive data discovery scan during any stage of the pipeline. Because most customers do their extract, transform, and load (ETL) daily, the application scans for sensitive data on a scheduled basis before any crawler jobs run to catalog the data and after typical validation and data redaction or tokenization processes complete.

You can expect that this additional validation will add 5–10 minutes to your pipeline execution at a minimum. The validation processing time will scale linearly based on object size, but there is a start-up time per job that is constant.

If sensitive data is found in the objects, an email is sent to the designated administrator requesting an approval decision, which they indicate by selecting the link corresponding to their decision to approve or deny the next step. In most cases, the reviewer will choose to adjust the sensitive data cleanup processes to remove the sensitive data, deny the progression of the files, and re-ingest the files in the pipeline.

Additional considerations for deploying this application for regular use are discussed at the end of the blog post.

Application components

The following resources are created as part of the application:

Note: the application uses various AWS services, and there are costs associated with these resources after the Free Tier usage. See AWS Pricing for details. The primary drivers of the solution cost will be the amount of data ingested through the pipeline, both for Amazon S3 storage and data processed for sensitive data discovery with Macie.

The architecture of the application is shown in Figure 2 and described in the text that follows.
 

Figure 2: Application architecture and logic

Figure 2: Application architecture and logic

Application logic

  1. Objects are uploaded to the raw data S3 bucket as part of the data ingestion process.
  2. A scheduled EventBridge rule runs the sensitive data scan Step Functions workflow.
  3. triggerMacieScan Lambda function moves objects from the raw data S3 bucket to the scan stage S3 bucket.
  4. triggerMacieScan Lambda function creates a Macie sensitive data discovery job on the scan stage S3 bucket.
  5. checkMacieStatus Lambda function checks the status of the Macie sensitive data discovery job.
  6. isMacieStatusCompleteChoice Step Functions Choice state checks whether the Macie sensitive data discovery job is complete.
    1. If yes, the getMacieFindingsCount Lambda function runs.
    2. If no, the Step Functions Wait state waits 60 seconds and then restarts Step 5.
  7. getMacieFindingsCount Lambda function counts all of the findings from the Macie sensitive data discovery job.
  8. isSensitiveDataFound Step Functions Choice state checks whether sensitive data was found in the Macie sensitive data discovery job.
    1. If there was sensitive data discovered, run the triggerManualApproval Lambda function.
    2. If there was no sensitive data discovered, run the moveAllScanStageS3Files Lambda function.
  9. moveAllScanStageS3Files Lambda function moves all of the objects from the scan stage S3 bucket to the scanned data S3 bucket.
  10. triggerManualApproval Lambda function tags and moves objects with sensitive data discovered to the manual review S3 bucket, and moves objects with no sensitive data discovered to the scanned data S3 bucket. The function then sends a notification to the ApprovalRequestNotification Amazon SNS topic as a notification that manual review is required.
  11. Email is sent to the email address that’s subscribed to the ApprovalRequestNotification Amazon SNS topic (from the application deployment template) for the manual review user with the option to Approve or Deny pipeline ingestion for these objects.
  12. Manual review user assesses the objects with sensitive data in the manual review S3 bucket and selects the Approve or Deny links in the email.
  13. The decision request is sent from the Amazon API Gateway to the receiveApprovalDecision Lambda function.
  14. manualApprovalChoice Step Functions Choice state checks the decision from the manual review user.
    1. If denied, run the deleteManualReviewS3Files Lambda function.
    2. If approved, run the moveToScannedDataS3Files Lambda function.
  15. deleteManualReviewS3Files Lambda function deletes the objects from the manual review S3 bucket.
  16. moveToScannedDataS3Files Lambda function moves the objects from the manual review S3 bucket to the scanned data S3 bucket.
  17. The next step of the automated data pipeline will begin with the objects in the scanned data S3 bucket.

Prerequisites

For this application, you need the following prerequisites:

You can use AWS Cloud9 to deploy the application. AWS Cloud9 includes the AWS CLI and AWS SAM CLI to simplify setting up your development environment.

Deploy the application with AWS SAM CLI

You can deploy this application using the AWS SAM CLI. AWS SAM uses AWS CloudFormation as the underlying deployment mechanism. AWS SAM is an open-source framework that you can use to build serverless applications on AWS.

To deploy the application

  1. Initialize the serverless application using the AWS SAM CLI from the GitHub project in the aws-samples repository. This will clone the project locally which includes the source code for the Lambda functions, Step Functions state machine definition file, and the AWS SAM template. On the command line, run the following:
    sam init – location gh: aws-samples/amazonmacie-datapipeline-scan
    

    Alternatively, you can clone the Github project directly.

  2. Deploy your application to your AWS account. On the command line, run the following:
    sam deploy – guided
    

    Complete the prompts during the guided interactive deployment. The first deployment prompt is shown in the following example.

    Configuring SAM deploy
    ======================
    
            Looking for config file [samconfig.toml] :  Found
            Reading default arguments  :  Success
    
            Setting default arguments for 'sam deploy'
            =========================================
            Stack Name [maciepipelinescan]:
    

  3. Settings:
    • Stack Name – Name of the CloudFormation stack to be created.
    • AWS RegionRegion—for example, us-west-2, eu-west-1, ap-southeast-1—to deploy the application to. This application was tested in the us-west-2 and ap-southeast-1 Regions. Before selecting a Region, verify that the services you need are available in those Regions (for example, Macie and Step Functions).
    • Parameter StepFunctionName – Name of the Step Functions state machine to be created—for example, maciepipelinescanstatemachine).
    • Parameter BucketNamePrefix – Prefix to apply to the S3 buckets to be created (S3 bucket names are globally unique, so choosing a random prefix helps ensure uniqueness).
    • Parameter ApprovalEmailDestination – Email address to receive the manual review notification.
    • Parameter EnableMacie – Whether you need Macie enabled in your account or Region. You can select yes or no; select yes if you need Macie to be enabled for you as part of this template, select no, if you already have Macie enabled.
  4. Confirm changes and provide approval for AWS SAM CLI to deploy the resources to your AWS account by responding y to prompts, as shown in the following example. You can accept the defaults for the SAM configuration file and SAM configuration environment prompts.
    #Shows you resources changes to be deployed and require a 'Y' to initiate deploy
    Confirm changes before deploy [y/N]: y
    #SAM needs permission to be able to create roles to connect to the resources in your template
    Allow SAM CLI IAM role creation [Y/n]: y
    ReceiveApprovalDecisionAPI may not have authorization defined, Is this okay? [y/N]: y
    ReceiveApprovalDecisionAPI may not have authorization defined, Is this okay? [y/N]: y
    Save arguments to configuration file [Y/n]: y
    SAM configuration file [samconfig.toml]: 
    SAM configuration environment [default]:
    

    Note: This application deploys an Amazon API Gateway with two REST API resources without authorization defined to receive the decision from the manual review step. You will be prompted to accept each resource without authorization. A token (Step Functions taskToken) is used to authenticate the requests.

  5. This creates an AWS CloudFormation changeset. Once the changeset creation is complete, you must provide a final confirmation of y to Deploy the changeset? [y/N] when prompted as shown in the following example.
    Changeset created successfully. arn:aws:cloudformation:ap-southeast-1:XXXXXXXXXXXX:changeSet/samcli-deploy1605213119/db681961-3635-4305-b1c7-dcc754c7XXXX
    
    
    Previewing CloudFormation changeset before deployment
    ======================================================
    Deploy this changeset? [y/N]:
    

Your application is deployed to your account using AWS CloudFormation. You can track the deployment events in the command prompt or via the AWS CloudFormation console.

After the application deployment is complete, you must confirm the subscription to the Amazon SNS topic. An email will be sent to the email address entered in Step 3 with a link that you need to select to confirm the subscription. This confirmation provides opt-in consent for AWS to send emails to you via the specified Amazon SNS topic. The emails will be notifications of potentially sensitive data that need to be approved. If you don’t see the verification email, be sure to check your spam folder.

Test the application

The application uses an EventBridge scheduled rule to start the sensitive data scan workflow, which runs every 6 hours. You can manually start an execution of the workflow to verify that it’s working. To test the function, you will need a file that contains data that matches your rules for sensitive data. For example, it is easy to create a spreadsheet, document, or text file that contains names, addresses, and numbers formatted like credit card numbers. You can also use this generated sample data to test Macie.

We will test by uploading a file to our S3 bucket via the AWS web console. If you know how to copy objects from the command line, that also works.

Upload test objects to the S3 bucket

  1. Navigate to the Amazon S3 console and upload one or more test objects to the <BucketNamePrefix>-data-pipeline-raw bucket. <BucketNamePrefix> is the prefix you entered when deploying the application in the AWS SAM CLI prompts. You can use any objects as long as they’re a supported file type for Amazon Macie. I suggest uploading multiple objects, some with and some without sensitive data, in order to see how the workflow processes each.

Start the Scan State Machine

  1. Navigate to the Step Functions state machines console. If you don’t see your state machine, make sure you’re connected to the same region that you deployed your application to.
  2. Choose the state machine you created using the AWS SAM CLI as seen in Figure 3. The example state machine is maciepipelinescanstatemachine, but you might have used a different name in your deployment.
     
    Figure 3: AWS Step Functions state machines console

    Figure 3: AWS Step Functions state machines console

  3. Select the Start execution button and copy the value from the Enter an execution name – optional box. Change the Input – optional value replacing <execution id> with the value just copied as follows:
    {
        “id”: “<execution id>”
    }
    

    In my example, the <execution id> is fa985a4f-866b-b58b-d91b-8a47d068aa0c from the Enter an execution name – optional box as shown in Figure 4. You can choose a different ID value if you prefer. This ID is used by the workflow to tag the objects being processed to ensure that only objects that are scanned continue through the pipeline. When the EventBridge scheduled event starts the workflow as scheduled, an ID is included in the input to the Step Functions workflow. Then select Start execution again.
     

    Figure 4: New execution dialog box

    Figure 4: New execution dialog box

  4. You can see the status of your workflow execution in the Graph inspector as shown in Figure 5. In the figure, the workflow is at the pollForCompletionWait step.
     
    Figure 5: AWS Step Functions graph inspector

    Figure 5: AWS Step Functions graph inspector

The sensitive discovery job should run for about five to ten minutes. The jobs scale linearly based on object size, but there is a start-up time per job that is constant. If sensitive data is found in the objects uploaded to the <BucketNamePrefix>-data-pipeline-upload S3 bucket, an email is sent to the address provided during the AWS SAM deployment step, notifying the recipient requesting of the need for an approval decision, which they indicate by selecting the link corresponding to their decision to approve or deny the next step as shown in Figure 6.
 

Figure 6: Sensitive data identified email

Figure 6: Sensitive data identified email

When you receive this notification, you can investigate the findings by reviewing the objects in the <BucketNamePrefix>-data-pipeline-manual-review S3 bucket. Based on your review, you can either apply remediation steps to remove any sensitive data or allow the data to proceed to the next step of the data ingestion pipeline. You should define a standard response process to address discovery of sensitive data in the data pipeline. Common remediation steps include review of the files for sensitive data, deleting the files that you do not want to progress, and updating the ETL process to redact or tokenize sensitive data when re-ingesting into the pipeline. When you re-ingest the files into the pipeline without sensitive data, the files will not be flagged by Macie.

The workflow performs the following:

  • If you select Approve, the files are moved to the <BucketNamePrefix>-data-pipeline-scanned-data S3 bucket with an Amazon S3 SensitiveDataFound object tag with a value of true.
  • If you select Deny, the files are deleted from the <BucketNamePrefix>-data-pipeline-manual-review S3 bucket.
  • If no action is taken, the Step Functions workflow execution times out after five days and the file will automatically be deleted from the <BucketNamePrefix>-data-pipeline-manual-review S3 bucket after 10 days.

Clean up the application

You’ve successfully deployed and tested the sensitive data pipeline scan workflow. To avoid ongoing charges for resources you created, you should delete all associated resources by deleting the CloudFormation stack. In order to delete the CloudFormation stack, you must first delete all objects that are stored in the S3 buckets that you created for the application.

To delete the application

  1. Empty the S3 buckets created in this application (<BucketNamePrefix>-data-pipeline-raw S3 bucket, <BucketNamePrefix>-data-pipeline-scan-stage, <BucketNamePrefix>-data-pipeline-manual-review, and <BucketNamePrefix>-data-pipeline-scanned-data).
  2. Delete the CloudFormation stack used to deploy the application.

Considerations for regular use

Before using this application in a production data pipeline, you will need to stop and consider some practical matters. First, the notification mechanism used when sensitive data is identified in the objects is email. Email doesn’t scale: you should expand this solution to integrate with your ticketing or workflow management system. If you choose to use email, subscribe a mailing list so that the work of reviewing and responding to alerts is shared across a team.

Second, the application is run on a scheduled basis (every 6 hours by default). You should consider starting the application when your preliminary validations have completed and are ready to perform a sensitive data scan on the data as part of your pipeline. You can modify the EventBridge Event Rule to run in response to an Amazon EventBridge event instead of a scheduled basis.

Third, the application currently uses a 60 second Step Functions Wait state when polling for the Macie discovery job completion. In real world scenarios, the discovery scan will take 10 minutes at a minimum, likely several orders of magnitude longer. You should evaluate the typical execution times for your application execution and tune the polling period accordingly. This will help reduce costs related to running Lambda functions and log storage within CloudWatch Logs. The polling period is defined in the Step Functions state machine definition file (macie_pipeline_scan.asl.json) under the pollForCompletionWait state.

Fourth, the application currently doesn’t account for false positives in the sensitive data discovery job results. Also, the application will progress or delete all objects identified based on the decision by the reviewer. You should consider expanding the application to handle false positives through automation rather than manual review / intervention (such as deleting the files from the manual review bucket or removing the sensitive data tags applied).

Last, the solution will stop the ingestion of a subset of objects into your pipeline. This behavior is similar to other validation and data quality checks that most customers perform as part of the data pipeline. However, you should test to ensure that this will not cause unexpected outcomes and address them in your downstream application logic accordingly.

Conclusion

In this post, I showed you how to integrate sensitive data discovery using Macie as an additional validation step in an automated data pipeline. You’ve reviewed the components of the application, deployed it using the AWS SAM CLI, tested to validate that the application functions as expected, and cleaned up by removing deployed resources.

You now know how to integrate sensitive data scanning into your ETL pipeline. You can use automation and—where required—manual review to help reduce the risk of sensitive data, such as personally identifiable information, being inadvertently ingested into a data lake. You can take this application and customize it to fit your use case and workflows, such as using custom data identifiers as part of your scans, adding additional validation steps, creating Macie suppression rules to define cases to archive findings automatically, or only request manual approvals for findings that meet certain criteria (such as high severity findings).

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the Amazon Macie forum.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Brandon Wu

Brandon is a security solutions architect helping financial services organizations secure their critical workloads on AWS. In his spare time, he enjoys exploring outdoors and experimenting in the kitchen.

Securing and managing multi-cloud Presto Clusters with Grab’s DataGateway

Post Syndicated from Grab Tech original https://engineering.grab.com/data-gateway

Introduction

Data is the lifeblood of Grab and the insights we gain from it drive all the most critical business decisions made by Grabbers and our leaders every day.

Grab’s Data Engineering (DE) team is responsible for maintaining the data platform, which consists of data pipelines, job schedulers, and the query/computation engines that are the key components for generating insights from data. SQL is the core language for analytics at Grab and as of early 2020, our Presto platform serves about 200 user groups that add up to 500 users who run 350,000 queries every day. These queries span across 10,000 tables that process up to 1PB of data daily.

In 2016, we started the DataGateway project to enable us to manage data access for the hundreds of Grabbers who needed access to Presto for their work. Since then, DataGateway has grown to become much more than just an access control mechanism for Presto. In this blog, we want to share what we’ve achieved since the initial launch of the project.

The problems we wanted to solve

As we were reviewing the key challenges around data access in Grab and assessing possible solutions, we came up with this prioritized list of user requirements we wanted to work on:

  • Use a single endpoint to serve everyone.
  • Manage user access to clusters, schemas, tables, and fields.
  • Provide seamless user experience when presto clusters are scaled up/down, in/out, or provisioned/decommissioned.
  • Capture audit trail of user activities.

To provide Grabbers with the critical need of interactive querying, as well as performing extract, transform, load (ETL) jobs, we evaluated several technologies. Presto was among the ones we evaluated, and was what we eventually chose although it didn’t meet all of our requirements out of the box. In order to address these gaps, we came up with the idea of a security gateway for the Presto compute engine that could also act as a load balancer/proxy, this is how we ended up creating the DataGateway.

DataGateway is a service that sits between clients and Presto clusters. It is essentially a smart HTTP proxy server that is an abstraction layer on top of the Presto clusters that handles the following actions:

  1. Parse incoming SQL statements to get requested schemas, tables, and fields.
  2. Manage user Access Control List (ACL) to limit users’ data access by checking against the SQL parsing results.
  3. Manage users’ cluster access.
  4. Redirect users’ traffic to the authorized clusters.
  5. Show meaningful error messages to users whenever the query is rejected or exceptions from clusters are encountered.

Anatomy of DataGateway

The DataGateway’s key components are as follows:

  • API Service
  • SQL Parser
  • Auth framework
  • Administration UI

We leveraged Kubernetes to run all these components as microservices.

Figure 1. DataGateway Key Components
Figure 1. DataGateway Key Components

API Service

This is the component that manages all users and cluster-facing processes. We integrated this service with the Presto API, which means it appears to be the same as a Presto cluster to a client. It accepts query requests from clients, gets the parsing result and runs authorization from the SQL Parser and the Auth Framework.

If everything is good to go, the API Service forwards queries to the assigned clusters and continues the entire query process.

Auth Framework

This handles both authentication and authorization requests. It stores the ACL of users and communicates with the API Service and the SQL Parser to run the entire authentication process. But why is it a microservice instead of a module in API Service, you ask? It’s because we keep evolving the security checks at Grab to ensure that everything is compliant with our security requirements, especially when dealing with data.

We wanted to make it flexible to fulfill ad-hoc requests from the security team without affecting the API Service. Furthermore, there are different authentication methods out there that we might need to deal with (OAuth2, SSO, you name it). The API Service supports multiple authentication frameworks that enable different authentication methods for different users.

SQL Parser

This is a SQL parsing engine to get schema, tables, and fields by reading SQL statements. Since Presto SQL parsing works differently in each version, we would compile multiple SQL Parsers that are identical to the Presto clusters we run. The SQL Parser becomes the single source of truth.

Admin UI

This is a UI for Presto administrators to manage clusters and user access, as well as to select an authentication framework, making it easier for the administrators to deal with the entire ecosystem.

How we deployed DataGateway using Kubernetes

In the past couple of years, we’ve had significant growth in workloads from analysts and data scientists. As we were very enthusiastic about Kubernetes, DataGateway was chosen as one of the earliest services for deployment in Kubernetes. DataGateway in Kubernetes is known to be highly available and fully scalable to handle traffic from users and systems.

We also tested the HPA feature of Kubernetes, which is a dynamic scaling feature to scale in or out the number of pods based on actual traffic and resource consumption.

Figure 2. DataGateway deployment using Kubernetes
Figure 2. DataGateway deployment using Kubernetes

Functionality of DataGateway

This section highlights some of the ways we use DataGateway to manage our Presto ecosystem efficiently.

Restrict users based on Schema/Table level access

In a setup where a Presto cluster is deployed on AWS Amazon Elastic MapReduce (EMR) or Elastic Kubernetes Service (EKS), we configure an IAM role and attach it to the EMR or EKS nodes. The IAM role is set to limit the access to S3 storage. However, the IAM only provides bucket-level and file-level control; it doesn’t meet our requirements to have schema, table, and column-level ACLs. That’s how DataGateway is found useful in such scenarios.

One of the DataGateway services is an SQL Parser. As previously covered, this is a service that parses and digs out schemas and tables involved in a query. The API service receives the parsing result and checks against the ACL of users, and decides whether to allow or reject the query. This is a remarkable improvement in our security control since we now have another layer to restrict access, on top of the S3 storage. We’ve implemented an SQL-based access control down to table level.

As shown in the Figure 3, user A is trying run a SQL statement select * from locations.cities. The SQL Parser reads the statement and tells the API service that user A is trying to read data from the table cities in the schema locations. Then, the API service checks against the ACL of user A. The service finds that user A has only read access to table countries in schema locations. Eventually, the API service denies this attempt because user A doesn’t have read access to table cities in the schema locations.

Figure 3. An example of how to check user access to run SQL statements
Figure 3. An example of how to check user access to run SQL statements

The above flow shows an access denied result because the user doesn’t have the appropriate permissions.

Seamless User Experience during the EMR migration

We use AWS EMR to deploy Presto as an SQL query engine since deployment is really easy. However, without DataGateway, any EMR operations such as terminations, new cluster deployment, config changes, and version upgrades, would require quite a bit of user involvement. We would sometimes need users to make changes on their side. For example, request users to change the endpoints to connect to suitable clusters.

With DataGateway, ACLs exist for each of the user accounts. The ACL includes the list of EMR clusters that users are allowed to access. As a Presto access management platform, here the DataGateway redirects user traffics to an appropriate cluster based on the ACL, like a proxy. Users always connect to the same endpoint we offer, which is the DataGateway. To switch over from one cluster to another, we just need to edit the cluster ACL and everything is handled seamlessly.

Figure 4. Cluster switching using DataGateway
Figure 4. Cluster switching using DataGateway

Figure 4 highlights the case when we’re switching EMR from one cluster to another. No changes are required from users.

We executed the migration of our entire Presto platform from an AWS EMR instance to another AWS EMR instance using the same methodology. The migrations were executed with little to no disruption for our users. We were able to move 40 clusters with hundreds of users. They were able to issue millions of queries daily in a few phases over a couple of months.

In most cases, users didn’t have to make any changes on their end, they just continued using Presto as usual while we made the changes in the background.

Multi-Cloud Data Lake/Presto Cluster maintenance

Recently, we started to build and maintain data lakes not just in one cloud, but two – in AWS and Azure. Since most end-users are AWS-based, and each team has their own AWS sub-account to run their services and workloads, it would be a nightmare to bridge all the connections and access routes between these two clouds from end-to-end, sub-account by sub-account.

Here, the DataGateway plays the role of the multi-cloud gateway. Since all end-users’ AWS sub-accounts have peered to DataGateway’s network, everything becomes much easier to handle.

For end-users, they retain the same Presto connection profile. The DE team then handles the connection setup from DataGateway to Azure, and also the deployment of Presto clusters in Azure.

When all is set, end-users use the same endpoint to DataGateway. We offer a feature called Cluster Switch that allows users to switch between AWS Presto cluster and Azure Presto Cluster on the fly by filling in parameters on the connection string. This feature allows users to switch to their target Presto cluster without any endpoint changes. The switch works instantly whenever they do the change. That means users can run different queries in different clusters based on their requirements.

This feature has helped the DE team to maintain Presto Cluster easily. We can spin up different Presto clusters for different teams, so that each team has their own query engine to run their queries with dedicated resources.

Figure 5. Sub-account connections and Queries
Figure 5. Sub-account connections and Queries

Figure 5 shows an example of how sub-accounts connect to DataGateway and run queries on resources in different clouds and clusters.

Figure 6. Sample scenario without DataGateway
Figure 6. Sample scenario without DataGateway

Figure 6 shows a scenario of what would happen if DataGatway doesn’t exist. Each of the accounts would have to maintain its own connections, Virtual Private Cloud (VPC) peering, and express link to connect to our Presto resources.

Summary

DataGateway is playing a key role in Grab’s entire Presto ecosystem. It helps us manage user access and cluster selections on a single endpoint, ensuring that everyone is running their Presto queries on the same place. It also helps distribute workload to different types and versions of Presto clusters.

When we started to deploy the DataGateway on Kubernetes, our vision for the Presto ecosystem underwent an epic change as it further motivated us to continuously improve. Since then, we’ve had new ideas on deployment method/pipeline, microservice implementations, scaling strategy, resource control, we even made use of Kubernetes and designed an on-demand, container-based Presto cluster provisioning engine. We’ll share this in another engineering blog, so do stay tuned!.

We also made crucial enhancements on data access control as we extended Presto’s access controls down to the schema/table-level.

In day-to-day operations, especially when we started to implement data lake in multiple clouds, DataGateway solved a lot of implementation issues. DataGateway made it simpler to switch a user’s Presto cluster from one cloud to another or allow a user to use a different Presto cluster using parameters. DataGateway allowed us to provide a seamless experience to our users.

Looking forward, we’ve more and more ideas for our Presto ecosystem, such Spark DataGateway or AWS Athena integrations, to keep our data safe at any time and to provide our users with a smoother experience when dealing with data used for analysis or research.


Authored by Vinnson Lee on behalf of the Presto Development Team at Grab – Edwin Law, Qui Hieu Nguyen, Rahul Penti, Wenli Wan, Wang Hui and the Data Engineering Team.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.

Data first, SLA always

Post Syndicated from Grab Tech original https://engineering.grab.com/data-first-sla-always

Introducing Trailblazer, the Data Engineering team’s solution to implementing change data capture of all upstream databases. In this article, we introduce the reason why we needed to move away from periodic batch ingestion towards a real time solution and show how we achieved this through an end to end streaming pipeline.

Context

Our mission as Grab’s Data Engineering team is to fulfill 100% of SLAs for data availability to our downstream users. Our 40 person team is responsible for providing accurate and reliable data to data analysts and data scientists so that they can produce actionable reports that will help Grab’s leadership team make data-driven decisions. We maintain data for a variety of business intelligence tools such as Tableau, Presto and Holistics as well as predictive algorithms for all of Grab.

We ingest data from multiple upstream sources, such as relational databases, Kafka or third party applications such as Salesforce or Zendesk. The majority of these source data exists in MySQL and we run ETL pipelines to mirror any updates into our data lake. These pipelines are triggered on an hourly or daily basis and are powered by an in-house Loader application which performs Spark batch ingestion and loading of data from source to sink.

Problems with the Loader application started to surface when Grab’s data exceeded the petabyte threshold. As such for larger tables, the most practical method to ingest data was to perform ETL only on rows that were updated within a specified timeframe. This is akin to issuing the query

SELECT * FROM table WHERE updated >= [start_time] AND updated < [end_time]

Now imagine two situations. One, firing this query to a huge table without an updated field. Two, firing the same query to the huge table, this time without indexes on the updated field. In the first scenario, the query will never work and we can never perform incremental ingestion on the table based on a timed window. The second scenario carries the dangers of creating high CPU load to replicate the database that we are querying from. Neither has an ideal outcome.

One other problem that we identified was the unpredictability of growth in data volume. Tables smaller than one gigabyte were ingested by fully scanning the table and overwriting the data in the data lake. This worked out well for us until the table size increased exponentially, at which point our Spark jobs failed due to JDBC timeouts. If we were only dealing with a handful of tables, this issue could have been addressed by switching our data ingestion strategy from full scan to a timed window.

When assessing the issue, we discovered that there were hundreds of tables running under the full scan strategy, all of them potentially crashing our data system, all time bombs silently waiting to explode.

The team urgently needed a new approach to ETL. Our Loader application was highly coupled to upstream table characteristics. We needed to find solutions that were truly scalable, which meant decoupling our pipelines from the upstream.

Change data capture (CDC)

Much like event sourcing, any log change to the database is captured and streamed out for downstream applications to consume. This process is lightweight since any row level update to the table is instantly captured by a real time processor, avoiding the need for large chunked queries on the table. In addition, CDC works regardless of upstream table definition, so we do not need to worry about missing updated columns impacting our data migration process.

Binary Logs (binlogs) are the CDC agents of MySQL. All updates, insertions or deletions performed on the table are captured as a series of logged events containing the past state of the row and it’s newly modified state. Check out the binlogs reference to find out more.

In order to persist all binlogs generated upstream, our team created a Spark Structured Streaming application called Trailblazer. Trailblazer streams all MySQL binlogs to our data lake. These binlogs serve as a foundation for us to build Presto tables for data auditing and help to remove the direct dependency of our batch ETL jobs to the source MySQL.

Trailblazer is an amalgamation of various data streaming stacks. Binlogs are captured by Debezium which runs on Kafka connect clusters. All binlogs are sent to our Kafka cluster, which is managed by the Data Engineering Infrastructure team and are streamed out to a real time bucket via a Spark structured streaming application. Hourly or daily ETL compaction jobs ingests the change logs from the real time bucket to materialize tables for downstream users to consume.

CDC in action where binlogs are streamed to Kafka via Debezium before being consumed by Trailblazer streaming & compaction services
CDC in action where binlogs are streamed to Kafka via Debezium before being consumed by Trailblazer streaming & compaction services

 

Some statistics

To date, we are streaming hundreds oftables across 60 Spark streaming jobs and with the constant increase in Grab’s database instances, the numbers are expected to keep growing.

Designing Trailblazer streams

We built our streaming application using Spark structured streaming 2.3. Structured streaming was designed to remove the technical aspects of provisioning streams. Developers can focus on perfecting business logic without worrying about fundamentals such as checkpoint management or reading and writing to data sources.

Key architecture for Trailblazer streaming
Key architecture for Trailblazer streaming

 

In the design phase, we made sure to follow several key principles that helped in managing our streams.

Checkpoints have to be externally managed

Structured streaming manages checkpoints both in a local directory and in a ‘_metadata’ directory on S3 buckets, such that the state of the stream can be restored in the event of failure and restart.

This is all well and good, with two exceptions. First, changing the starting point of data ingestion meant ssh-ing into the machine and manipulating metadata, which could be extremely dangerous. Second, we could not assume cluster prevalence since clusters can die and be recreated with data erased from its local disk or the distributed file system.

Our solution was to do a work around at the application level. All checkpoints will be stored in temporary directories with the existing timestamp appended as path (eg /tmp/checkpoint/job_A/1560697200/… ). A linearly progressive timestamp guarantees that the same directory will never be reused by new instances of the stream. This explains why we never restore its state from local disk but instead, store all checkpoints in a highly available Redis cluster, with key as the Kafka topic and value as a JSON of partition : offset.

Key

debz-schema-A.schema_A.table_B

Value

{"11":19183566,"12":19295602,"13":18992606[[a]](#cmnt1)[[b]](#cmnt2)[[c]](#cmnt3)[[d]](#cmnt4)[[e]](#cmnt5)[[f]](#cmnt6),"14":19269499,"15":19197199,"16":19060873,"17":19237853,"18":19107959,"19":19188181,"0":19193976,"1":19072585,"2":19205764,"3":19122454,"4":19231068,"5":19301523,"6":19287447,"7":19418871,"8":19152003,"9":19112431,"10":19151479}
Example of how offsets are stored in Redis as Key : Value pairs

 

Fortunately, structured streaming provides the StreamQueryListener class which we can use to register checkpoints after the completion of each microbatch.

Streams must handle 0, 1 or 1 million data

Scalability is at the heart of all well-designed applications. Spark streaming jobs are built for scalability in the face of varying data volumes.

In general, the rate of messages input to Kafka is cyclical across 24 hrs. Streaming jobs should be robust enough to handle data loads during peak hours of the day without breaching microbatch timing
In general, the rate of messages input to Kafka is cyclical across 24 hrs. Streaming jobs should be robust enough to handle data loads during peak hours of the day without breaching microbatch timing

 

There are a few settings that we can configure to influence the degree of scalability for a streaming app

  • spark.dynamicAllocation.enabled=true gives spark autonomy to provision / revoke executors to suit the workload
  • spark.dynamicAllocation.maxExecutors controls the maximum job parallelism
  • maxOffsetsPerTrigger controls the maximum number of messages ingested from Kafka per microbatch
  • trigger controls the duration between microbatchs and is a property of the DataStreamWriter class

Data as key health indicator

Scaling the number of streaming jobs without prior collection of performance metrics is a bad idea. There is a high chance that you will discover a dead stream when checking your stream hours after initialization. I’ll cite Murphy’s law as proof.

Thus we vigilantly monitored our data streams. We used tools such as Datadog for metric monitoring, Slack for oncall issue reporting, PagerDuty for urgent cases and our inhouse data auditor as a service (DASH) for counts discrepancy reporting between streamed and source data. More details on monitoring will be discussed in the later part.

Streams are ephemeral

Streams may die due to a hundred and one reasons so don’t blame yourself or your programming insecurities. Issues with upstream dependencies, such as a node within your Kafka cluster running out of disk space, could lead to partition unavailability which would crash the application. On one occasion, our streaming application was unable to resolve DNS when writing to AWS S3 storage. This amounted to multiple failures within our Spark job that eventually culminated in the termination of the stream.

In this case, allow the stream to  shutdown gracefully, send out your alerts and have a mechanism in place to retry the failed stream. We run all streaming jobs on Airflow and any failure to the stream will automatically be retried through a new task issued by the scheduler.

If you have had experience with large scale management of streams, please leave a comment so we can continue this discussion!

Monitoring data streams

Here are some key features that were set up to monitor our streams.

Running : Active jobs ratio

The number of streaming jobs could increase in the future, thus becoming a challenge for the oncall team to track all jobs that are supposed to be up and running.

One proposal  is  to track the number of jobs in production against the number of jobs that are actually running. By querying MySQL tables, we can filter out all the jobs that are meant to be active. Since Trailblazer streams are spark-submit jobs managed by YARN, we can query YARN’s resource manager REST API to retrieve  all the jobs that are running. We then construct a ratio of running : active jobs and report them to Datadog. If the ratio is not 1 for an extended duration, an alert will be issued for the oncall to take action.

If the ratio of running : active jobs falls below 1 for a period of time, we will immediately trigger an alert
If the ratio of running : active jobs falls below 1 for a period of time, we will immediately trigger an alert

 

Microbatch runtime

We define a 30 second window for each microbatch and track the actual runtime using metrics reported by the query listener. A runtime that exceeds the designated window is a potential indicator that the streaming job is deprived of resources and needs to be scaled up.

Job liveliness

Each job reports its health by emitting a count of 1 heartbeat. This heartbeat is created at the end of every microbatch via a query listener. This process is useful in detecting stale jobs (jobs that are registered as RUNNING in YARN but are actually hung).

Kafka offset divergence

In order to ensure that the message output rate to the consumer exceeds the message input rate from the producer, we sum up all presently ingested topic-partition offsets and compare that value to the sum of all topic-partition end offsets in Kafka. We then add an alerting logic on top of these metrics to inform the oncall team if the difference between the two values grows too big.

It is important to track the offset divergence parameter as streams can be lagging. Should the rate of consumption fall below the rate of message production, we would run the risk of falling short of Kafka’s retention window, leading to data losses.

Hourly data checks

DASH runs hourly and serves as our first line of defence to detect any data quality issues within the streams. We issue queries to the source database and our streaming layer to confirm that the ID counts of data created within the last hour match.

DASH helps in the early detection of upstream issues. We have noticed cases where our Debezium connectors failed and our checker reported fewer data than expected since there were no incoming messages to Kafka.

DASH matches and mismatches reported to Slack
DASH matches and mismatches reported to Slack
DASH matches and mismatches reported to Slack

 

Materializing tables through compaction

Having CDC data in our data lake does not conclude our responsibilities. Batched compaction allows us to apply all captured CDC, to be available as Presto tables for downstream consumption. The job is set to trigger hourly and process all changes to the database within the past hour.  For example, changes to a record are visible in real-time, but the latest state of the record will not be reflected until the next time a batch job runs. We addressed several issues with streaming during this phase.

Deduplication of data

Trailblazer was not built to deliver exactly once guarantees. We ensure that the issues regarding duplicated CDCs are addressed during compaction.

Availability of all data until certain hour

We want to make sure that downstream pipelines use output data of the hourly batch job only when the pipeline has all records for that hour. In case there is an event that is processed late by streaming, the current pipeline will wait until the data is completed. In this case, we are consciously choosing consistency over availability for our downstream users. For example, missing a few insert booking records in peak hours due to consumer processing delay can generate the wrong downstream results leading to miscalculation in revenue. We want to start  downstream processes only when the data for the hour or day is complete.

Need for latest state of each event

Our compaction job performs upserts on the data to ensure that our downstream users can consume  records in their latest state.  

Future applications

Trailblazer is a milestone for the Data Engineering team as it represents our commitment to achieve large scale data streams to reduce latencies for our end users. Moving ahead, our team will be exploring how we can further optimize streaming jobs by analysing data trends over time and to build applications such as snapshot tables on top of the CDCs being streamed in our data lake.

How we simplified our Data Ingestion & Transformation Process

Post Syndicated from Grab Tech original https://engineering.grab.com/data-ingestion-transformation-product-insights

Introduction

As Grab grew from a small startup to an organisation serving millions of customers and driver partners, making day-to-day data-driven decisions became paramount. We needed a system to efficiently ingest data from mobile apps and backend systems and then make it available for analytics and engineering teams.

Thanks to modern data processing frameworks, ingesting data isn’t a big issue. However, at Grab scale it is a non-trivial task. We had to prepare for two key scenarios:

  • Business growth, including organic growth over time and expected seasonality effects.
  • Any unexpected peaks due to unforeseen circumstances. Our systems have to be horizontally scalable.

We could ingest data in batches, in real time, or a combination of the two. When you ingest data in batches, you can import it at regularly scheduled intervals or when it reaches a certain size. This is very useful when processes run on a schedule, such as reports that run daily at a specific time. Typically, batched data is useful for offline analytics and data science.

On the other hand, real-time ingestion has significant business value, such as with reactive systems. For example, when a customer provides feedback for a Grab superapp widget, we re-rank widgets based on that customer’s likes or dislikes. Note when information is very time-sensitive, you must continuously monitor its data.

This blog post describes how Grab built a scalable data ingestion system and how we went from prototyping with Spark Streaming to running a production-grade data processing cluster written in Golang.

Building the system without reinventing the wheel

The data ingestion system:

  1. Collects raw data as app events.
  2. Transforms the data into a structured format.
  3. Stores the data for analysis and monitoring.

In a previous blog post, we discussed dealing with batched data ETL with Spark. This post focuses on real-time ingestion.

We separated the data ingestion system into 3 layers: collection, transformation, and storage. This table and diagram highlights the tools used in each layer in our system’s first design.

LayerTools
CollectionGateway, Kafka
TransformationGo processing service, Spark Streaming
StorageTalariaDB

Our first design might seem complex, but we used battle-tested and common tools such as Apache Kafka and Spark Streaming. This let us get an end-to-end solution up and running quickly.

Collection layer

Our collection layer had two sub-layers:

  1. Our custom built API Gateway received HTTP requests from the mobile app. It simply decoded and authenticated HTTP requests, streaming the data to the Kafka queue.
  2. The Kafka queue decoupled the transformation layer (shown in the above figure as the processing service and Spark streaming) from the collection layer (shown above as the Gateway service). We needed to retain raw data in the Kafka queue for fault tolerance of the entire system. Imagine an error where a data pipeline pollutes the data with flawed transformation code or just simply crashes. The Kafka queue saves us from data loss by data backfilling.

Since it’s robust and battle-tested, we chose Kafka as our queueing solution. It perfectly met our requirements, such as high throughput and low latency. Although Kafka takes some operational effort such as self-hosting and monitoring, Grab has a proficient and dedicated team managing our Kafka cluster.

Transformation layer

There are many options for real-time data processing, including Spark Streaming, Flink, and Storm. Since we use Spark for all our batch processing, we decided to use Spark Streaming.

We deployed a Golang processing service between Kafka and Spark Streaming. This service converts the data from Protobuf to Avro. Instead of pointing Spark Streaming directly to Kafka, we used this processing service as an intermediary. This was because our Spark Streaming job was written in Python and Spark doesn’t natively support protobuf decoding.  We used Avro format, since Grab historically used it for archiving streaming data. Each raw event was enriched and batched together with other events. Batches were then uploaded to S3.

Storage layer

TalariaDB is a Grab-built time-series database. It ingests events as columnar ORC files, indexing them by event name and time. We use the same ORC format files for batch processing. TalariaDB also implements the Presto Thrift connector interface, so our users could query certain event types by time range. They did this by connecting a Presto to a TalariaDB hosting distributed cluster.

Problems

Building and deploying our data pipeline’s MVP provided great value to our data analysts, engineers, and QA team. For example, our mobile app team could monitor any abnormal change in the real-time metrics, such as the screen load time for the latest released app version. The QA team could perform app side actions (book a ride, make payment, etc.) and check which events were triggered and received by the backend. The latency between the ingestion and the serving layer was only 4 minutes instead of the batch processing system’s 60 minutes. The streaming processing’s data showed good business value.

This prompted us to develop more features on top of our platform-collected real-time data. Very soon our QA engineers and the product analytics team used more and more of the real-time data processing system. They started instrumenting various mobile applications so more data started flowing in. However, as our ingested data increased, so did our problems. These were mostly related to operational complexity and the increased latency.

Operational complexity

Only a few team members could operate Spark Streaming and EMR. With more data and variable rates, our streaming jobs had scaling issues and failed occasionally. This was due to checkpoint issues when the cluster was under heavy load. Increasing the cluster size helped, but adding more nodes also increased the likelihood of losing more cluster nodes. When we lost nodes,our latency went up and added more work for our already busy on-call engineers.

Supporting native Protobuf

To simplify the architecture, we initially planned to bypass our Golang-written processing service for the real-time data pipeline. Our plan was to let Spark directly talk to the Kafka queue and send the output to S3. This required packaging the decoders for our protobuf messages for Python Spark jobs, which was cumbersome. We thought about rewriting our job in Scala, but we didn’t have enough experience with it.

Also, we’d soon hit some streaming limits from S3. Our Spark streaming job was consuming objects from S3, but the process was not continuous due to S3’s  eventual consistency. To avoid long pagination queries in the S3 API, we had to prefix the data with the hour in which it was ingested. This resulted in some data loss after processing by the Spark streaming. The loss happened because the new data would appear in S3 while Spark Streaming had already moved on to the next hour. We tried various tweaks, but it was just a bad design. As our data grew to over one terabyte per hour, our data loss grew with it.

Processing lag

On average, the time from our system ingesting an event to when it was available on the Presto was 4 to 6 minutes. We call that processing lag, as it happened due to our data processing. It was substantially worse under heavy loads, increasing to 8 to 13 minutes. While that wasn’t bad at this scale (a few TBs of data), it made some use cases impossible, such as monitoring. We needed to do better.

Simplifying the architecture and rewriting in Golang

After completing the MVP phase development, we noticed the Spark Streaming functionality we actually used was relatively trivial. In the Spark Streaming job, we only:

  • Partitioned the batch of events by event name.
  • Encoded the data in ORC format.
  • And uploaded to an S3 bucket.

To mitigate the problems mentioned above, we tried re-implementing the features in our existing Golang processing service. Besides consuming the data and publishing to an S3 bucket, the transformation service also needed to deal with event partitioning and ORC encoding.

One key problem we addressed was implementing a robust event partitioner with a large write throughput and low read latency. Fortunately, Golang has a nice concurrent map package. To further reduce the lock contention, we added sharding.

We made the changes, deployed the service to production,and discovered our service was now memory-bound as we buffered data for 1 minute. We did thorough benchmarking and profiling on heap allocation to improve memory utilization. By iteratively reducing inefficiencies and contributing to a lower CPU consumption, we made our data transformation more efficient.

Performance

After revamping the system, the elapsed time for a single event to travel from the gateway to our dashboard is about 1 minute. We also fixed the data loss issue. Finally, we significantly reduced our on-call workload by removing Spark Streaming.

Validation

At this point, we had both our old and new pipelines running in parallel. After drastically improving our performance, we needed to confirm we still got the same end results. This was done by running a query against each of the pipelines and comparing the results. Both systems were registered to the same Presto cluster.

We ran two SQL “excerpts” between the two pipelines in different order. Both queries returned the same events, validating our new pipeline’s correctness.

select count(1) from ((
 select uuid, time from grab_x.realtime_new
 where event = 'app.metric1' and time between 1541734140 and 1541734200
) except (
 select uuid, time from grab_x.realtime_old
 where event = 'app.metric1' and time between 1541734140 and 1541734200
))

/* output: 0 */

Conclusions

Scaling a data ingestion system to handle hundreds of thousands of events per second was a non-trivial task. However, by iterating and constantly simplifying our overall architecture, we were able to efficiently ingest the data and drive down its lag to around one minute.

Spark Streaming was a great tool and gave us time to understand the problem. But, understanding what we actually needed to build and iteratively optimise the entire data pipeline led us to:

  • Replacing Spark Streaming with our new Golang-implemented pipeline.
  • Removing Avro encoding.
  • Removing an intermediary S3 step.

Differences between the old and new pipelines are:

Old PipelineNew Pipeline
LanguagesPython, GoGo
Stages4 services3 services
ConversionsProtobuf → Avro → ORCProtobuf → ORC
Lag4-13 min1 min

Systems usually become more and more complex over time, leading to tech debt and decreased performance. In our case, starting with more steps in the data pipeline was actually the simple solution, since we could re-use existing tools. But as we reduced processing stages, we’ve also seen fewer failures. By simplifying the problem, we improved performance and decreased operational complexity. At the end of the day, our data pipeline solves exactly our problem and does nothing else, keeping things fast.