Tag Archives: Amazon AppFlow

Simplify data transfer: Google BigQuery to Amazon S3 using Amazon AppFlow

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/simplify-data-transfer-google-bigquery-to-amazon-s3-using-amazon-appflow/

In today’s data-driven world, the ability to effortlessly move and analyze data across diverse platforms is essential. Amazon AppFlow, a fully managed data integration service, has been at the forefront of streamlining data transfer between AWS services, software as a service (SaaS) applications, and now Google BigQuery. In this blog post, you explore the new Google BigQuery connector in Amazon AppFlow and discover how it simplifies the process of transferring data from Google’s data warehouse to Amazon Simple Storage Service (Amazon S3), providing significant benefits for data professionals and organizations, including the democratization of multi-cloud data access.

Overview of Amazon AppFlow

Amazon AppFlow is a fully managed integration service that you can use to securely transfer data between SaaS applications such as Google BigQuery, Salesforce, SAP, Hubspot, and ServiceNow, and AWS services such as Amazon S3 and Amazon Redshift, in just a few clicks. With Amazon AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities such as filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow automatically encrypts data in motion, and allows you to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats.

Introducing the Google BigQuery connector

The new Google BigQuery connector in Amazon AppFlow unveils possibilities for organizations seeking to use the analytical capability of Google’s data warehouse, and to effortlessly integrate, analyze, store, or further process data from BigQuery, transforming it into actionable insights.

Architecture

Let’s review the architecture to transfer data from Google BigQuery to Amazon S3 using Amazon AppFlow.

architecture

  1. Select a data source: In Amazon AppFlow, select Google BigQuery as your data source. Specify the tables or datasets you want to extract data from.
  2. Field mapping and transformation: Configure the data transfer using the intuitive visual interface of Amazon AppFlow. You can map data fields and apply transformations as needed to align the data with your requirements.
  3. Transfer frequency: Decide how frequently you want to transfer data—such as daily, weekly, or monthly—supporting flexibility and automation.
  4. Destination: Specify an S3 bucket as the destination for your data. Amazon AppFlow will efficiently move the data, making it accessible in your Amazon S3 storage.
  5. Consumption: Use Amazon Athena to analyze the data in Amazon S3.

Prerequisites

The dataset used in this solution is generated by Synthea, a synthetic patient population simulator and opensource project under the Apache License 2.0. Load this data into Google BigQuery or use your existing dataset.

Connect Amazon AppFlow to your Google BigQuery account

For this post, you use a Google account, OAuth client with appropriate permissions, and Google BigQuery data. To enable Google BigQuery access from Amazon AppFlow, you must set up a new OAuth client in advance. For instructions, see Google BigQuery connector for Amazon AppFlow.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

Create a new S3 bucket for Amazon AppFlow results

To create an S3 bucket, complete the following steps:

  1. On the AWS Management console for Amazon S3, choose Create bucket.
  2. Enter a globally unique name for your bucket; for example, appflow-bq-sample.
  3. Choose Create bucket.

Create a new S3 bucket for Amazon Athena results

To create an S3 bucket, complete the following steps:

  1. On the AWS Management console for Amazon S3, choose Create bucket.
  2. Enter a globally unique name for your bucket; for example, athena-results.
  3. Choose Create bucket.

User role (IAM role) for AWS Glue Data Catalog

To catalog the data that you transfer with your flow, you must have the appropriate user role in AWS Identity and Access Management (IAM). You provide this role to Amazon AppFlow to grant the permissions it needs to create an AWS Glue Data Catalog, tables, databases, and partitions.

For an example IAM policy that has the required permissions, see Identity-based policy examples for Amazon AppFlow.

Walkthrough of the design

Now, let’s walk through a practical use case to see how the Amazon AppFlow Google BigQuery to Amazon S3 connector works. For the use case, you will use Amazon AppFlow to archive historical data from Google BigQuery to Amazon S3 for long-term storage an analysis.

Set up Amazon AppFlow

Create a new Amazon AppFlow flow to transfer data from Google Analytics to Amazon S3.

  1. On the Amazon AppFlow console, choose Create flow.
  2. Enter a name for your flow; for example, my-bq-flow.
  3. Add necessary Tags; for example, for Key enter env and for Value enter dev.

appflow-flow-setup­­­­

  1. Choose Next.
  2. For Source name, choose Google BigQuery.
  3. Choose Create new connection.
  4. Enter your OAuth Client ID and Client Secret, then name your connection; for example, bq-connection.

­bq-connection

  1. In the pop-up window, choose to allow amazon.com access to the Google BigQuery API.

bq-authentication

  1. For Choose Google BigQuery object, choose Table.
  2. For Choose Google BigQuery subobject, choose BigQueryProjectName.
  3. For Choose Google BigQuery subobject, choose DatabaseName.
  4. For Choose Google BigQuery subobject, choose TableName.
  5. For Destination name, choose Amazon S3.
  6. For Bucket details, choose the Amazon S3 bucket you created for storing Amazon AppFlow results in the prerequisites.
  7. Enter raw as a prefix.

appflow-source-destination

  1. Next, provide AWS Glue Data Catalog settings to create a table for further analysis.
    1. Select the User role (IAM role) created in the prerequisites.
    2. Create new database for example, healthcare.
    3. Provide a table prefix setting for example, bq.

glue-crawler-config

  1. Select Run on demand.

appflow-trigger-setup

  1. Choose Next.
  2. Select Manually map fields.
  3. Select the following six fields for Source field name from the table Allergies:
    1. Start
    2. Patient
    3. Code
    4. Description
    5. Type
    6. Category
  4. Choose Map fields directly.

appflow-field-mapping

  1. Choose Next.
  2. In the Add filters section, choose Next.
  3. Choose Create flow.

Run the flow

After creating your new flow, you can run it on demand.

  1. On the Amazon AppFlow console, choose my-bq-flow.
  2. Choose Run flow.

sppflow-run--status

For this walkthrough, choose run the job on-demand for ease of understanding. In practice, you can choose a scheduled job and periodically extract only newly added data.

Query through Amazon Athena

When you select the optional AWS Glue Data Catalog settings, Data Catalog creates the catalog for the data, allowing Amazon Athena to perform queries.

If you’re prompted to configure a query results location, navigate to the Settings tab and choose Manage. Under Manage settings, choose the Athena results bucket created in prerequisites and choose Save.

  1. On the Amazon Athena console, select the Data Source as AWSDataCatalog.
  2. Next, select Database as healthcare.
  3. Now you can select the table created by the AWS Glue crawler and preview it.

athena-results

  1. You can also run a custom query to find the top 10 allergies as shown in the following query.

Note: In the below query, replace the table name, in this case bq_appflow_mybqflow_1693588670_latest, with the name of the table generated in your AWS account.

SELECT type,
category,
"description",
count(*) as number_of_cases
FROM "healthcare"."bq_appflow_mybqflow_1693588670_latest"
GROUP BY type,
category,
"description"
ORDER BY number_of_cases DESC
LIMIT 10;

  1. Choose Run query.

athena-custom-query-results

This result shows the top 10 allergies by number of cases.

Clean up

To avoid incurring charges, clean up the resources in your AWS account by completing the following steps:

  1. On the Amazon AppFlow console, choose Flows in the navigation pane.
  2. From the list of flows, select the flow my-bq-flow, and delete it.
  3. Enter delete to delete the flow.
  4. Choose Connections in the navigation pane.
  5. Choose Google BigQuery from the list of connectors, select bq-connector, and delete it.
  6. Enter delete to delete the connector.
  7. On the IAM console, choose Roles in the navigation page, then select the role you created for AWS Glue crawler and delete it.
  8. On the Amazon Athena console:
    1. Delete the tables created under the database healthcare using AWS Glue crawler.
    2. Drop the database healthcare
  9. On the Amazon S3 console, search for the Amazon AppFlow results bucket you created, choose Empty to delete the objects, then delete the bucket.
  10. On the Amazon S3 console, search for the Amazon Athena results bucket you created, choose Empty to delete the objects, then delete the bucket.
  11. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The Google BigQuery connector in Amazon AppFlow streamlines the process of transferring data from Google’s data warehouse to Amazon S3. This integration simplifies analytics and machine learning, archiving, and long-term storage, providing significant benefits for data professionals and organizations seeking to harness the analytical capabilities of both platforms.

With Amazon AppFlow, the complexities of data integration are eliminated, enabling you to focus on deriving actionable insights from your data. Whether you’re archiving historical data, performing complex analytics, or preparing data for machine learning, this connector simplifies the process, making it accessible to a broader range of data professionals.

If you’re interested to see how the data transfer from Google BigQuery to Amazon S3 using Amazon AppFlow, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on Amazon AppFlow, visit Amazon AppFlow.


About the authors

Kartikay Khator is a Solutions Architect on the Global Life Science at Amazon Web Services. He is passionate about helping customers on their cloud journey with focus on AWS analytics services. He is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Empower your Jira data in a data lake with Amazon AppFlow and AWS Glue

Post Syndicated from Tom Romano original https://aws.amazon.com/blogs/big-data/empower-your-jira-data-in-a-data-lake-with-amazon-appflow-and-aws-glue/

In the world of software engineering and development, organizations use project management tools like Atlassian Jira Cloud. Managing projects with Jira leads to rich datasets, which can provide historical and predictive insights about project and development efforts.

Although Jira Cloud provides reporting capability, loading this data into a data lake will facilitate enrichment with other business data, as well as support the use of business intelligence (BI) tools and artificial intelligence (AI) and machine learning (ML) applications. Companies often take a data lake approach to their analytics, bringing data from many different systems into one place to simplify how the analytics are done.

This post shows you how to use Amazon AppFlow and AWS Glue to create a fully automated data ingestion pipeline that will synchronize your Jira data into your data lake. Amazon AppFlow provides software as a service (SaaS) integration with Jira Cloud to load the data into your AWS account. AWS Glue is a serverless data discovery, load, and transformation service that will prepare data for consumption in BI and AI/ML activities. Additionally, this post strives to achieve a low-code and serverless solution for operational efficiency and cost optimization, and the solution supports incremental loading for cost optimization.

Solution overview

This solution uses Amazon AppFlow to retrieve data from the Jira Cloud. The data is synchronized to an Amazon Simple Storage Service (Amazon S3) bucket using an initial full download and subsequent incremental downloads of changes. When new data arrives in the S3 bucket, an AWS Step Functions workflow is triggered that orchestrates extract, transform, and load (ETL) activities using AWS Glue crawlers and AWS Glue DataBrew. The data is then available in the AWS Glue Data Catalog and can be queried by services such as Amazon Athena, Amazon QuickSight, and Amazon Redshift Spectrum. The solution is completely automated and serverless, resulting in low operational overhead. When this setup is complete, your Jira data will be automatically ingested and kept up to date in your data lake!

The following diagram illustrates the solution architecture.

The Jira Appflow Architecture is shown. The Jira Cloud data is retrieved by Amazon AppFlow and is stored in Amazon S3. This triggers an Amazon EventBridge event that runs an AWS Step Functions workflow. The workflow uses AWS Glue to catalog and transform the data, The data is then queried with QuickSight.

The Step Functions workflow orchestrates the following ETL activities, resulting in two tables:

  • An AWS Glue crawler collects all downloads into a single AWS Glue table named jira_raw. This table is comprised of a mix of full and incremental downloads from Jira, with many versions of the same records representing changes over time.
  • A DataBrew job prepares the data for reporting by unpacking key-value pairs in the fields, as well as removing depreciated records as they are updated in subsequent change data captures. This reporting-ready data will available in an AWS Glue table named jira_data.

The following figure shows the Step Functions workflow.

A diagram represents the AWS Step Functions workflow. It contains the steps to run an AWS Crawler, wait for it's completion, and then run a AWS Glue DataBrew data transformation job.

Prerequisites

This solution requires the following:

  • Administrative access to your Jira Cloud instance, and an associated Jira Cloud developer account.
  • An AWS account and a login with access to the AWS Management Console. Your login will need AWS Identity and Access Management (IAM) permissions to create and access the resources in your AWS account.
  • Basic knowledge of AWS and working knowledge of Jira administration.

Configure the Jira Instance

After logging in to your Jira Cloud instance, you establish a Jira project with associated epics and issues to download into a data lake. If you’re starting with a new Jira instance, it helps to have at least one project with a sampling of epics and issues for the initial data download, because it allows you to create an initial dataset without errors or missing fields. Note that you may have multiple projects as well.

An image show a Jira Cloud example, with several issues arranged in a Kansan board.

After you have established your Jira project and populated it with epics and issues, ensure you also have access to the Jira developer portal. In later steps, you use this developer portal to establish authentication and permissions for the Amazon AppFlow connection.

Provision resources with AWS CloudFormation

For the initial setup, you launch an AWS CloudFormation stack to create an S3 bucket to store data, IAM roles for data access, and the AWS Glue crawler and Data Catalog components. Complete the following steps:

  1. Sign in to your AWS account.
  2. Click Launch Stack:
  3. For Stack name, enter a name for the stack (the default is aws-blog-jira-datalake-with-AppFlow).
  4. For GlueDatabaseName, enter a unique name for the Data Catalog database to hold the Jira data table metadata (the default is jiralake).
  5. For InitialRunFlag, choose Setup. This mode will scan all data and disable the change data capture (CDC) features of the stack. (Because this is the initial load, the stack needs an initial data load before you configure CDC in later steps.)
  6. Under Capabilities and transforms, select the acknowledgement check boxes to allow IAM resources to be created within your AWS account.
  7. Review the parameters and choose Create stack to deploy the CloudFormation stack. This process will take around 5–10 minutes to complete.
    An image depicts the Amazon CloudFormation configuration steps, including setting a stack name, setting parameters to "jiralake" and "Setup" mode, and checking all IAM capabilities requested.
  8. After the stack is deployed, review the Outputs tab for the stack and collect the following values to use when you set up Amazon AppFlow:
    • Amazon AppFlow destination bucket (o01AppFlowBucket)
    • Amazon AppFlow destination bucket path (o02AppFlowPath)
    • Role for Amazon AppFlow Jira connector (o03AppFlowRole)
      An image demonstrating the Amazon Cloudformation "Outputs" tab, highlighting the values to add to the Amazon AppFlow configuration.

Configure Jira Cloud

Next, you configure your Jira Cloud instance for access by Amazon AppFlow. For full instructions, refer to Jira Cloud connector for Amazon AppFlow. The following steps summarize these instructions and discuss the specific configuration to enable OAuth in the Jira Cloud:

  1. Open the Jira developer portal.
  2. Create the OAuth 2 integration from the developer application console by choosing Create an OAuth 2.0 Integration. This will provide a login mechanism for AppFlow.
  3. Enable fine-grained permissions. See Recommended scopes for the permission settings to grant AppFlow appropriate access to your Jira instance.
  4. Add the following permission scopes to your OAuth app:
    1. manage:jira-configuration
    2. read:field-configuration:jira
  5. Under Authorization, set the Call Back URL to return to Amazon AppFlow with the URL https://us-east-1.console.aws.amazon.com/AppFlow/oauth.
  6. Under Settings, note the client ID and secret to use in later steps to set up authentication from Amazon AppFlow.

Create the Amazon AppFlow Jira Cloud connection

In this step, you configure Amazon AppFlow to run a one-time full data fetch of all your data, establishing the initial data lake:

  1. On the Amazon AppFlow console, choose Connectors in the navigation pane.
  2. Search for the Jira Cloud connector.
  3. Choose Create flow on the connector tile to create the connection to your Jira instance.
    An image of Amazon AppFlor, showing the search for the "Jira Cloud" connector.
  4. For Flow name, enter a name for the flow (for example, JiraLakeFlow).
  5. Leave the Data encryption setting as the default.
  6. Choose Next.
    The Amazon AppFlow Jira connector configuration, showing the Flow name set to "JiraLakeFlow" and clicking the "next" button.
  7. For Source name, keep the default of Jira Cloud.
  8. Choose Create new connection under Jira Cloud connection.
  9. In the Connect to Jira Cloud section, enter the values for Client ID, Client secret, and Jira Cloud Site that you collected earlier. This provides the authentication from AppFlow to Jira Cloud.
  10. For Connection Name, enter a connection name (for example, JiraLakeCloudConnection).
  11. Choose Connect. You will be prompted to allow your OAuth app to access your Atlassian account to verify authentication.
    An image of the Amazon AppFlow conflagration, reflecting the completion of the prior steps.
  12. In the Authorize App window that pops up, choose Accept.
  13. With the connection created, return to the Configure flow section on the Amazon AppFlow console.
  14. For API version, choose V2 to use the latest Jira query API.
  15. For Jira Cloud object, choose Issue to query and download all issues and associated details.
    An image of the Amazon AppFlow configuration, reflecting the completion of the prior steps.
  16. For Destination Name in the Destination Details section, choose Amazon S3.
  17. For Bucket details, choose the S3 bucket name that matches the Amazon AppFlow destination bucket value that you collected from the outputs of the CloudFormation stack.
  18. Enter the Amazon AppFlow destination bucket path to complete the full S3 path. This will send the Jira data to the S3 bucket created by the CloudFormation script.
  19. Leave Catalog your data in the AWS Glue Data Catalog unselected. The CloudFormation script uses an AWS Glue crawler to update the Data Catalog in a different manner, grouping all the downloads into a common table, so we disable the update here.
  20. For File format settings, select Parquet format and select Preserve source data types in Parquet output. Parquet is a columnar format to optimize subsequent querying.
  21. Select Add a timestamp to the file name for Filename preference. This will allow you to easily find data files downloaded at a specific date and time.
    An image of the Amazon AppFlow configuration, reflecting the completion of the prior steps.
  22. For now, select Run on Demand for the Flow trigger to run the full load flow manually. You will schedule downloads in a later step when implementing CDC.
  23. Choose Next.
    An image of the Amazon AppFlow Flow Trigger configuration, reflecting the completion of the prior steps.
  24. On the Map data fields page, select Manually map fields.
  25. For Source to destination field mapping, choose the drop-down box under Source field name and select Map all fields directly. This will bring down all fields as they are received, because we will instead implement data preparation in later steps.
    An image of the Amazon AppFlow configuration, reflecting the completion of steps 24 & 25.
  26. Under Partition and aggregation settings, you can set up the partitions in a way that works for your use case. For this example, we use a daily partition, so select Date and time and choose Daily.
  27. For Aggregation settings, leave it as the default of Don’t aggregate.
  28. Choose Next.
    An image of the Amazon AppFlow configuration, reflecting the completion of steps 26-28.
  29. On the Add filters page, you can create filters to only download specific data. For this example, you download all the data, so choose Next.
  30. Review and choose Create flow.
  31. When the flow is created, choose Run flow to start the initial data seeding. After some time, you should receive a banner indicating the run finished successfully.
    An image of the Amazon AppFlow configuration, reflecting the completion of step 31.

Review seed data

At this stage in the process, you now have data in your S3 environment. When new data files are created in the S3 bucket, it will automatically run an AWS Glue crawler to catalog the new data. You can see if it’s complete by reviewing the Step Functions state machine for a Succeeded run status. There is a link to the state machine on the CloudFormation stack’s Resources tab, which will redirect you to the Step Functions state machine.

A image showing the CloudFormation resources tab of the stack, with a link to the AWS Step Functions workflow.

When the state machine is complete, it’s time to review the raw Jira data with Athena. The database is as you specified in the CloudFormation stack (jiralake by default), and the table name is jira_raw. If you kept the default AWS Glue database name of jiralake, the Athena SQL is as follows:

SELECT * FROM "jiralake"."jira_raw" limit 10;

If you explore the data, you’ll notice that most of the data you would want to work with is actually packed into a column called fields. This means the data is not available as columns in your Athena queries, making it harder to select, filter, and sort individual fields within an Athena SQL query. This will be addressed in the next steps.

An image demonstrating the Amazon Athena query SELECT * FROM "jiralake"."jira_raw" limit 10;

Set up CDC and unpack the fields columns

To add the ongoing CDC and reformat the data for analytics, we introduce a DataBrew job to transform the data and filter to the most recent version of each record as changes come in. You can do this by updating the CloudFormation stack with a flag that includes the CDC and data transformation steps.

  1. On the AWS CloudFormation console, return to the stack.
  2. Choose Update.
  3. Select Use current template and choose Next.
    An image showing Amazon CloudFormation, with steps 1-3 complete.
  4. For SetupOrCDC, choose CDC, then choose Next. This will enable both the CDC steps and the data transformation steps for the Jira data.
    An image showing Amazon CloudFormation, with step 4 complete.
  5. Continue choosing Next until you reach the Review section.
  6. Select I acknowledge that AWS CloudFormation might create IAM resources, then choose Submit.
    An image showing Amazon CloudFormation, with step 5-6 complete.
  7. Return to the Amazon AppFlow console and open your flow.
  8. On the Actions menu, choose Edit flow. We will now edit the flow trigger to run an incremental load on a periodic basis.
  9. Select Run flow on schedule.
  10. Configure the desired repeats, as well as start time and date. For this example, we choose Daily for Repeats and enter 1 for the number of days you’ll have the flow trigger. For Starting at, enter 01:00.
  11. Select Incremental transfer for Transfer mode.
  12. Choose Updated on the drop-down menu so that changes will be captured based on when the records were updated.
  13. Choose Save. With these settings in our example, the run will happen nightly at 1:00 AM.
    An image showing the Flow Trigger, with incremental transfer selected.

Review the analytics data

When the next incremental load occurs that results in new data, the Step Functions workflow will start the DataBrew job and populate a new staged analytical data table named jira_data in your Data Catalog database. If you don’t want to wait, you can trigger the Step Functions workflow manually.

The DataBrew job performs data transformation and filtering tasks. The job unpacks the key-values from the Jira JSON data and the raw Jira data, resulting in a tabular data schema that facilitates use with BI and AI/ML tools. As Jira items are changed, the changed item’s data is resent, resulting in multiple versions of an item in the raw data feed. The DataBrew job filters the raw data feed so that the resulting data table only contains the most recent version of each item. You could enhance this DataBrew job to further customize the data for your needs, such as renaming the generic Jira custom field names to reflect their business meaning.

When the Step Functions workflow is complete, we can query the data in Athena again using the following query:

SELECT * FROM "jiralake"."jira_data" limit 10;

You can see that in our transformed jira_data table, the nested JSON fields are broken out into their own columns for each field. You will also notice that we’ve filtered out obsolete records that have been superseded by more recent record updates in later data loads so the data is fresh. If you want to rename custom fields, remove columns, or restructure what comes out of the nested JSON, you can modify the DataBrew recipe to accomplish this. At this point, the data is ready to be used by your analytics tools, such as Amazon QuickSight.

An image demonstrating the Amazon Athena query SELECT * FROM "jiralake"."jira_data" limit 10;

Clean up

If you would like to discontinue this solution, you can remove it with the following steps:

  1. On the Amazon AppFlow console, deactivate the flow for Jira, and optionally delete it.
  2. On the Amazon S3 console, select the S3 bucket for the stack, and empty the bucket to delete the existing data.
  3. On the AWS CloudFormation console, delete the CloudFormation stack that you deployed.

Conclusion

In this post, we created a serverless incremental data load process for Jira that will synchronize data while handling custom fields using Amazon AppFlow, AWS Glue, and Step Functions. The approach uses Amazon AppFlow to incrementally load the data into Amazon S3. We then use AWS Glue and Step Functions to manage the extraction of the Jira custom fields and load them in a format to be queried by analytics services such as Athena, QuickSight, or Redshift Spectrum, or AI/ML services like Amazon SageMaker.

To learn more about AWS Glue and DataBrew, refer to Getting started with AWS Glue DataBrew. With DataBrew, you can take the sample data transformation in this project and customize the output to meet your specific needs. This could include renaming columns, creating additional fields, and more.

To learn more about Amazon AppFlow, refer to Getting started with Amazon AppFlow. Note that Amazon AppFlow supports integrations with many SaaS applications in addition to the Jira Cloud.

To learn more about orchestrating flows with Step Functions, see Create a Serverless Workflow with AWS Step Functions and AWS Lambda. The workflow could be enhanced to load the data into a data warehouse, such as Amazon Redshift, or trigger a refresh of a QuickSight dataset for analytics and reporting.

In future posts, we will cover how to unnest parent-child relationships within the Jira data using Athena and how to visualize the data using QuickSight.


About the Authors

Tom Romano is a Sr. Solutions Architect for AWS World Wide Public Sector from Tampa, FL, and assists GovTech and EdTech customers as they create new solutions that are cloud native, event driven, and serverless. He is an enthusiastic Python programmer for both application development and data analytics, and is an Analytics Specialist. In his free time, Tom flies remote control model airplanes and enjoys vacationing with his family around Florida and the Caribbean.

Shane Thompson is a Sr. Solutions Architect based out of San Luis Obispo, California, working with AWS Startups. He works with customers who use AI/ML in their business model and is passionate about democratizing AI/ML so that all customers can benefit from it. In his free time, Shane loves to spend time with his family and travel around the world.

Cross-account integration between SaaS platforms using Amazon AppFlow

Post Syndicated from Ramakant Joshi original https://aws.amazon.com/blogs/big-data/cross-account-integration-between-saas-platforms-using-amazon-appflow/

Implementing an effective data sharing strategy that satisfies compliance and regulatory requirements is complex. Customers often need to share data between disparate software as a service (SaaS) platforms within their organization or across organizations. On many occasions, they need to apply business logic to the data received from the source SaaS platform before pushing it to the target SaaS platform.

Let’s take an example. AnyCompany’s marketing team hosted an event at the Anaheim Convention Center, CA. The marketing team created leads based on the event in Adobe Marketo. An automated process downloaded the leads from Marketo in the marketing AWS account. These leads are then pushed to the sales AWS account. A business process picks up those leads, filters them based on a “Do Not Call” criteria, and creates entries in the Salesforce system. Now, the sales team can pursue those leads and continue to track the opportunities in Salesforce.

In this post, we show how to share your data across SaaS platforms in a cross-account structure using fully managed, low-code AWS services such as Amazon AppFlow, Amazon EventBridge, AWS Step Functions, and AWS Glue.

Solution overview

Considering our example of AnyCompany, let’s look at the data flow. AnyCompany’s Marketo instance is integrated with the producer AWS account. As the leads from Marketo land in the producer AWS account, they’re pushed to the consumer AWS account, which is integrated to Salesforce. Business logic is applied to the leads data in the consumer AWS account, and then the curated data is loaded into Salesforce.

We have used a serverless architecture to implement this use case. The following AWS services are used for data ingestion, processing, and load:

  • Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications like Salesforce, SAP, Marketo, Slack, and ServiceNow, and AWS services like Amazon S3 and Amazon Redshift, in just a few clicks. With AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities like filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow is used to download leads data from Marketo and upload the curated leads data into Salesforce.
  • Amazon EventBridge is a serverless event bus that lets you receive, filter, transform, route, and deliver events. EventBridge is used to track the events like receiving the leads data in the producer or consumer AWS accounts and then triggering a workflow.
  • AWS Step Functions is a visual workflow service that helps developers use AWS services to build distributed applications, automate processes, orchestrate microservices, and create data and machine learning (ML) pipelines. Step Functions is used to orchestrate the data processing.
  • AWS Glue is a serverless data preparation service that makes it easy to run extract, transform, and load (ETL) jobs. An AWS Glue job encapsulates a script that reads, processes, and then writes data to a new schema. This solution uses Python 3.6 AWS Glue jobs for data filtration and processing.
  • Amazon Simple Storage Service (Amazon S3) is an object storage service offering industry-leading scalability, data availability, security, and performance. Amazon S3 is used to store the leads data.

Let’s review the architecture in detail. The following diagram shows a visual representation of how this integration works.

The following steps outline the process for transferring and processing leads data using Amazon AppFlow, Amazon S3, EventBridge, Step Functions, AWS Glue, and Salesforce:

  1. Amazon AppFlow runs on a daily schedule and retrieves any new leads created within the last 24 hours (incremental changes) from Marketo.
  2. The leads are saved as Parquet format files in an S3 bucket in the producer account.
  3. When the daily flow is complete, Amazon AppFlow emits events to EventBridge.
  4. EventBridge triggers Step Functions.
  5. Step Functions copies the Parquet format files containing the leads from the producer account’s S3 bucket to the consumer account’s S3 bucket.
  6. Upon a successful file transfer, Step Functions publishes an event in the consumer account’s EventBridge.
  7. An EventBridge rule intercepts this event and triggers Step Functions in the consumer account.
  8. Step Functions calls an AWS Glue crawler, which scans the leads Parquet files and creates a table in the AWS Glue Data Catalog.
  9. The AWS Glue job is called, which selects records with the Do Not Call field set to false from the leads files, and creates a new set of curated Parquet files. We have used an AWS Glue job for the ETL pipeline to showcase how you can use purpose-built analytics service for complex ETL needs. However, for simple filtering requirements like Do Not Call, you can use the existing filtering feature of Amazon AppFlow.
  10. Step Functions then calls Amazon AppFlow.
  11. Finally, Amazon AppFlow populates the Salesforce leads based on the data in the curated Parquet files.

We have provided artifacts in this post to deploy the AWS services in your account and try out the solution.

Prerequisites

To follow the deployment walkthrough, you need two AWS accounts, one for the producer and other for the consumer. Use us-east-1 or us-west-2 as your AWS Region.

Consumer account setup:

Stage the data

To prepare the data, complete the following steps:

  1. Download the zipped archive file to use for this solution and unzip the files locally.

The AWS Glue job uses the glue-job.py script to perform ETL and populates the curated table in the Data Catalog.

  1. Create an S3 bucket called consumer-configbucket-<ACCOUNT_ID> via the Amazon S3 console in the consumer account, where ACCOUNT_ID is your AWS account ID.
  2. Upload the script to this location.

Create a connection to Salesforce

Follow the connection setup steps outlined in here. Please make a note of the Salesforce connector name.

Create a connection to Salesforce in the consumer account

Follow the connection setup steps outlined in Create Opportunity Object Flow.

Set up resources with AWS CloudFormation

We provided two AWS CloudFormation templates to create resources: one for the producer account, and one for the consumer account.

Amazon S3 now applies server-side encryption with Amazon S3 managed keys (SSE-S3) as the base level of encryption for every bucket in Amazon S3. Starting January 5, 2023, all new object uploads to Amazon S3 are automatically encrypted at no additional cost and with no impact on performance. We use this default encryption for both producer and consumer S3 buckets. If you choose to bring your own keys with AWS Key Management Service (AWS KMS), we recommend referring to Replicating objects created with server-side encryption (SSE-C, SSE-S3, SSE-KMS) for cross-account replication.

Launch the CloudFormation stack in the consumer account

Let’s start with creating resources in the consumer account. There are a few dependencies on the consumer account resources from the producer account. To launch the CloudFormation stack in the consumer account, complete the following steps:

  1. Sign in to the consumer account’s AWS CloudFormation console in the target Region.
  2. Choose Launch Stack.
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-appflow-consumer.
  5. Enter the parameters for the connector name, object, and producer (source) account ID.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Stack creation takes approximately 5 minutes to complete. It will create the following resources. You can find them on the Outputs tab of the CloudFormation stack.

  • ConsumerS3Bucketconsumer-databucket-<consumer account id>
  • Consumer S3 Target Foldermarketo-leads-source
  • ConsumerEventBusArnarn:aws:events:<region>:<consumer account id>:event-bus/consumer-custom-event-bus
  • ConsumerEventRuleArnarn:aws:events:<region>:<consumer account id>:rule/consumer-custom-event-bus/consumer-custom-event-bus-rule
  • ConsumerStepFunctionarn:aws:states:<region>:<consumer account id>:stateMachine:consumer-state-machine
  • ConsumerGlueCrawlerconsumer-glue-crawler
  • ConsumerGlueJobconsumer-glue-job
  • ConsumerGlueDatabaseconsumer-glue-database
  • ConsumerAppFlowarn:aws:appflow:<region>:<consumer account id>:flow/consumer-appflow

Producer account setup:

Create a connection to Marketo

Follow the connection setup steps outlined in here. Please make a note of the Marketo connector name.

Launch the CloudFormation stack in the producer account

Now let’s create resources in the producer account. Complete the following steps:

  1. Sign in to the producer account’s AWS CloudFormation console in the source Region.
  2. Choose Launch Stack.
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-appflow-producer.
  5. Enter the following parameters and leave the rest as default:
    • AppFlowMarketoConnectorName: name of the Marketo connector, created above
    • ConsumerAccountBucket: consumer-databucket-<consumer account id>
    • ConsumerAccountBucketTargetFolder: marketo-leads-source
    • ConsumerAccountEventBusArn: arn:aws:events:<region>:<consumer account id>:event-bus/consumer-custom-event-bus
    • DefaultEventBusArn: arn:aws:events:<region>:<producer account id>:event-bus/default


  6. Choose Next.
  7. On the next page, choose Next.
  8. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  9. Choose Create stack.

Stack creation takes approximately 5 minutes to complete. It will create the following resources. You can find them on the Outputs tab of the CloudFormation stack.

  • Producer AppFlowproducer-flow
  • Producer Bucketarn:aws:s3:::producer-bucket.<region>.<producer account id>
  • Producer Flow Completion Rulearn:aws:events:<region>:<producer account id>:rule/producer-appflow-completion-event
  • Producer Step Functionarn:aws:states:<region>:<producer account id>:stateMachine:ProducerStateMachine-xxxx
  • Producer Step Function Rolearn:aws:iam::<producer account id>:role/service-role/producer-stepfunction-role
  1. After successful creation of the resources, go to the consumer account S3 bucket, consumer-databucket-<consumer account id>, and update the bucket policy as follows:
{
    "Version": "2008-10-17",
    "Statement": [
        {
            "Sid": "AllowAppFlowDestinationActions",
            "Effect": "Allow",
            "Principal": {"Service": "appflow.amazonaws.com"},
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>",
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>/*"
            ]
        }, {
            "Sid": "Producer-stepfunction-role",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<producer-account-id>:role/service-role/producer-stepfunction-role"
            },
            "Action": [
                "s3:ListBucket",
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>",
                "arn:aws:s3:::consumer-databucket-<consumer-account-id>/*"
            ]
        }
    ]
}

Validate the workflow

Let’s walk through the flow:

  1. Review the Marketo and Salesforce connection setup in the producer and consumer account respectively.

In the architecture section, we suggested scheduling the AppFlow (producer-flow) in the producer account. However, for quick testing purposes, we demonstrate how to manually run the flow on demand.

  1. Go to the AppFlow (producer-flow) in the producer account. On the Filters tab of the flow, choose Edit filters.
  2. Choose the Created At date range for which you have data.
  3. Save the range and choose Run flow.
  4. Review the producer S3 bucket.

AppFlow generates the files in the producer-flow prefix within this bucket. The files are temporarily located in the producer S3 bucket under s3://<producer-bucket>.<region>.<account-id>/producer-flow.

  1. Review the EventBridge rule and Step Functions state machine in the producer account.

The Amazon AppFlow job completion triggers an EventBridge rule (arn:aws:events:<region>:<producer account id>:rule/producer-appflow-completion-event, as noted in the Outputs tab of the CloudFromation stack in the Producer Account), which triggers the Step Functions state machine (arn:aws:states:<region>:<producer account id>:stateMachine:ProducerStateMachine-xxxx) in the producer account. The state machine copies the files to the consumer S3 bucket from the producer-flow prefix in the producer S3 bucket. Once file copy is complete, the state machine moves the files from the producer-flow prefix to the archive prefix in the producer S3 bucket. You can find the files in s3://<producer-bucket>.<region>.<account-id>/archive.

  1. Review the consumer S3 bucket.

The Step Functions state machine in the producer account copies the files to the consumer S3 bucket and sends an event to EventBridge in the consumer account. The files are located in the consumer S3 bucket under s3://consumer-databucket-<account-id>/marketo-leads-source/.

  1. Review the EventBridge rule (arn:aws:events:<region>:<consumer account id>:rule/consumer-custom-event-bus/consumer-custom-event-bus-rule) in the consumer account, which should have triggered the Step Function workflow (arn:aws:states:<region>:<consumer account id>:stateMachine:consumer-state-machine).

The AWS Glue crawler (consumer-glue-crawler) runs to update the metadata followed by the AWS Glue job (consumer-glue-job), which curates the data by applying the Do not call filter. The curated files are placed in s3://consumer-databucket-<account-id>/marketo-leads-curated/. After data curation, the flow is started as part of the state machine.

  1. Review the Amazon AppFlow job (arn:aws:appflow:<region>:<consumer account id>:flow/consumer-appflow) run status in the consumer account.

Upon a successful run of the Amazon AppFlow job, the curated data files are moved to the s3://consumer-databucket-<account-id>/marketo-leads-processed/ folder and Salesforce is updated with the leads. Additionally, all the original source files are moved from s3://consumer-databucket-<account-id>/marketo-leads-source/ to s3://consumer-databucket-<account-id>/marketo-leads-archive/.

  1. Review the updated data in Salesforce.

You will see newly created or updated leads created by Amazon AppFlow.

Clean up

To clean up the resources created as part of this post, delete the following resources:

  1. Delete the resources in the producer account:
    • Delete the producer S3 bucket content.
    • Delete the CloudFormation stack.
  2. Delete the resources in the consumer account:
    • Delete the consumer S3 bucket content.
    • Delete the CloudFormation stack.

Summary

In this post, we showed how you can support a cross-account model to exchange data between different partners with different SaaS integrations using Amazon AppFlow. You can expand this idea to support multiple target accounts.

For more information, refer to Simplifying cross-account access with Amazon EventBridge resource policies. To learn more about Amazon AppFlow, visit Amazon AppFlow.


About the authors

Ramakant Joshi is an AWS Solutions Architect, specializing in the analytics and serverless domain. He has a background in software development and hybrid architectures, and is passionate about helping customers modernize their cloud architecture.

Debaprasun Chakraborty is an AWS Solutions Architect, specializing in the analytics domain. He has around 20 years of software development and architecture experience. He is passionate about helping customers in cloud adoption, migration and strategy.

Suraj Subramani Vineet is a Senior Cloud Architect at Amazon Web Services (AWS) Professional Services in Sydney, Australia. He specializes in designing and building scalable and cost-effective data platforms and AI/ML solutions in the cloud. Outside of work, he enjoys playing soccer on weekends.

Synchronize your Salesforce and Snowflake data to speed up your time to insight with Amazon AppFlow

Post Syndicated from Ramesh Ranganathan original https://aws.amazon.com/blogs/big-data/synchronize-your-salesforce-and-snowflake-data-to-speed-up-your-time-to-insight-with-amazon-appflow/

This post was co-written with Amit Shah, Principal Consultant at Atos.

Customers across industries seek meaningful insights from the data captured in their Customer Relationship Management (CRM) systems. To achieve this, they combine their CRM data with a wealth of information already available in their data warehouse, enterprise systems, or other software as a service (SaaS) applications. One widely used approach is getting the CRM data into your data warehouse and keeping it up to date through frequent data synchronization.

Integrating third-party SaaS applications is often complicated and requires significant effort and development. Developers need to understand the application APIs, write implementation and test code, and maintain the code for future API changes. Amazon AppFlow, which is a low-code/no-code AWS service, addresses this challenge.

Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications, like Salesforce, SAP, Zendesk, Slack, and ServiceNow, and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift in just a few clicks. With Amazon AppFlow, you can run data flows at enterprise scale at the frequency you choose—on a schedule, in response to a business event, or on demand.

In this post, we focus on synchronizing your data from Salesforce to Snowflake (on AWS) without writing code. This post walks you through the steps to set up a data flow to address full and incremental data load using an example use case.

Solution overview

Our use case involves the synchronization of the Account object from Salesforce into Snowflake. In this architecture, you use Amazon AppFlow to filter and transfer the data to your Snowflake data warehouse.

You can configure Amazon AppFlow to run your data ingestion in three different ways:

  • On-demand – You can manually run the flow through the AWS Management Console, API, or SDK call.
  • Event-driven – Amazon AppFlow can subscribe and listen to change data capture (CDC) events from the source SaaS application.
  • Scheduled – Amazon AppFlow can run schedule-triggered flows based on a pre-defined schedule rule. With scheduled flows, you can choose either full or incremental data transfer:
    • With full transfer, Amazon AppFlow transfers a snapshot of all records at the time of the flow run from the source to the destination.
    • With incremental transfer, Amazon AppFlow transfers only the records that have been added or changed since the last successful flow run. To determine the incremental delta of your data, AppFlow requires you to specify a source timestamp field to instruct how Amazon AppFlow identifies new or updated records.

We use the on-demand trigger for the initial load of data from Salesforce to Snowflake, because it helps you pull all the records, irrespective of their creation. To then synchronize data periodically with Snowflake, after we run the on-demand trigger, we configure a scheduled trigger with incremental transfer. With this approach, Amazon AppFlow pulls the records based on a chosen timestamp field from the Salesforce Account object periodically, based on the time interval specified in the flow.

The Account_Staging table is created in Snowflake to act as a temporary storage that can be used to identify the data change events. Then the permanent table (Account) is updated from the staging table by running a SQL stored procedure that contains the incremental update logic. The following figure depicts the various components of the architecture and the data flow from the source to the target.

The data flow contains the following steps:

  1. First, the flow is run with on-demand and full transfer mode to load the full data into Snowflake.
  2. The Amazon AppFlow Salesforce connector pulls the data from Salesforce and stores it in the Account Data S3 bucket in CSV format.
  3. The Amazon AppFlow Snowflake connector loads the data into the Account_Staging table.
  4. A scheduled task, running at regular intervals in Snowflake, triggers a stored procedure.
  5. The stored procedure starts an atomic transaction that loads the data into the Account table and then deletes the data from the Account_Staging table.
  6. After the initial data is loaded, you update the flow to capture incremental updates from Salesforce. The flow trigger configuration is changed to scheduled, to capture data changes in Salesforce. This enables Snowflake to get all updates, deletes, and inserts in Salesforce at configured intervals.
  7. The flow uses the configured LastModifiedDate field to determine incremental changes.
  8. Steps 3, 4, and 5 are run again to load the incremental updates into the Snowflake Accounts table.

Prerequisites

To get started, you need the following prerequisites:

  • A Salesforce user account with sufficient privileges to install connected apps. Amazon AppFlow uses a connected app to communicate with Salesforce APIs. If you don’t have a Salesforce account, you can sign up for a developer account.
  • A Snowflake account with sufficient permissions to create and configure the integration, external stage, table, stored procedures, and tasks.
  • An AWS account with access to AWS Identity and Access Management (IAM), Amazon AppFlow, and Amazon S3.

Set up Snowflake configuration and Amazon S3 data

Complete the following steps to configure Snowflake and set up your data in Amazon S3:

  1. Create two S3 buckets in your AWS account: one for holding the data coming from Salesforce, and another for holding error records.

A best practice when creating your S3 bucket is to make sure you block public access to the bucket to ensure your data is not accessible by unauthorized users.

  1. Create an IAM policy named snowflake-access that allows listing the bucket contents and reading S3 objects inside the bucket.

Follow the instructions for steps 1 and 2 in Configuring a Snowflake Storage Integration to Access Amazon S3 to create an IAM policy and role. Replace the placeholders with your S3 bucket names.

  1. Log in to your Snowflake account and create a new warehouse called SALESFORCE and database called SALESTEST.
  2. Specify the format in which data will be available in Amazon S3 for Snowflake to load (for this post, CSV):
USE DATABASE SALESTEST;
CREATE or REPLACE file format my_csv_format
type = csv
field_delimiter = ','
Y skip_header = 1
null_if = ('NULL', 'null')
empty_field_as_null = true
compression = gzip;
  1. Amazon AppFlow uses the Snowflake COPY command to move data using an S3 bucket. To configure this integration, follow steps 3–6 in Configuring a Snowflake Storage Integration to Access Amazon S3.

These steps create a storage integration with your S3 bucket, update IAM roles with Snowflake account and user details, and creates an external stage.

This completes the setup in Snowflake. In the next section, you create the required objects in Snowflake.

Create schemas and procedures in Snowflake

In your Snowflake account, complete the following steps to create the tables, stored procedures, and tasks for implementing the use case:

  1. In your Snowflake account, open a worksheet and run the following DDL scripts to create the Account and Account_staging tables:
CREATE or REPLACE TABLE ACCOUNT_STAGING (
ACCOUNT_NUMBER STRING NOT NULL,
ACCOUNT_NAME STRING,
ACCOUNT_TYPE STRING,
ANNUAL_REVENUE NUMBER,
ACTIVE BOOLEAN NOT NULL,
DELETED BOOLEAN,
LAST_MODIFIED_DATE STRING,
primary key (ACCOUNT_NUMBER)
);

CREATE or REPLACE TABLE ACCOUNT (
ACCOUNT_NUMBER STRING NOT NULL,
ACCOUNT_NAME STRING,
ACCOUNT_TYPE STRING,
ANNUAL_REVENUE NUMBER,
ACTIVE BOOLEAN NOT NULL,
LAST_MODIFIED_DATE STRING,
primary key (ACCOUNT_NUMBER)
);
  1. Create a stored procedure in Snowflake to load data from staging to the Account table:
CREATE or REPLACE procedure sp_account_load( )
returns varchar not null
language sql
as
$$
begin
Begin transaction;
merge into ACCOUNT using ACCOUNT_STAGING
on ACCOUNT.ACCOUNT_NUMBER = ACCOUNT_STAGING.ACCOUNT_NUMBER
when matched AND ACCOUNT_STAGING.DELETED=TRUE then delete
when matched then UPDATE SET
ACCOUNT.ACCOUNT_NAME = ACCOUNT_STAGING.ACCOUNT_NAME,
ACCOUNT.ACCOUNT_TYPE = ACCOUNT_STAGING.ACCOUNT_TYPE,
ACCOUNT.ANNUAL_REVENUE = ACCOUNT_STAGING.ANNUAL_REVENUE,
ACCOUNT.ACTIVE = ACCOUNT_STAGING.ACTIVE,
ACCOUNT.LAST_MODIFIED_DATE = ACCOUNT_STAGING.LAST_MODIFIED_DATE
when NOT matched then
INSERT (
ACCOUNT.ACCOUNT_NUMBER,
ACCOUNT.ACCOUNT_NAME,
ACCOUNT.ACCOUNT_TYPE,
ACCOUNT.ANNUAL_REVENUE,
ACCOUNT.ACTIVE,
ACCOUNT.LAST_MODIFIED_DATE
)
values(
ACCOUNT_STAGING.ACCOUNT_NUMBER,
ACCOUNT_STAGING.ACCOUNT_NAME,
ACCOUNT_STAGING.ACCOUNT_TYPE,
ACCOUNT_STAGING.ANNUAL_REVENUE,
ACCOUNT_STAGING.ACTIVE,
ACCOUNT_STAGING.LAST_MODIFIED_DATE
) ;

Delete from ACCOUNT_STAGING;
Commit;
end;
$$
;

This stored procedure determines whether the data contains new records that need to be inserted or existing records that need to be updated or deleted. After a successful run, the stored procedure clears any data from your staging table.

  1. Create a task in Snowflake to trigger the stored procedure. Make sure that the time interval for this task is more than the time interval configured in Amazon AppFlow for pulling the incremental changes from Salesforce. The time interval should be sufficient for data to be processed.
CREATE OR REPLACE TASK TASK_ACCOUNT_LOAD
WAREHOUSE = SALESFORCE
SCHEDULE = 'USING CRON 5 * * * * America/Los_Angeles'
AS
call sp_account_load();
  1. Provide the required permissions to run the task and resume the task:
show tasks;
  • As soon as task is created it will be suspended state so needs to resume it manually first time
ALTER TASK TASK_ACCOUNT_LOAD RESUME;
  • If the role which is assigned to us doesn’t have proper access to resume/execute task needs to grant execute task privilege to that role
GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE SYSADMIN;

This completes the Snowflake part of configuration and setup.

Create a Salesforce connection

First, let’s create a Salesforce connection that can be used by AppFlow to authenticate and pull records from your Salesforce instance. On the AWS console, make sure you are in the same Region where your Snowflake instance is running.

  1. On the Amazon AppFlow console, choose Connections in the navigation pane.
  2. From the list of connectors, select Salesforce.
  3. Choose Create connection.
  4. For Connection name, enter a name of your choice (for example, Salesforce-blog).
  5. Leave the rest of the fields as default and choose Continue.
  6. You’re redirected to a sign-in page, where you need to log in to your Salesforce instance.
  7. After you allow Amazon AppFlow access to your Salesforce account, your connection is successfully created.
           

 Create a Snowflake connection

Complete the following steps to create your Snowflake connection:

  1. On the Connections menu, choose Snowflake.
  2. Choose Create connection.
  3. Provide information for the Warehouse, Stage name, and Bucket details fields.
  4. Enter your credential details.

  1. For Region, choose the same Region where Snowflake is running.
  2. For Connection name, name your connection Snowflake-blog.
  3. Leave the rest of the fields as default and choose Connect.

Create a flow in Amazon AppFlow

Now you create a flow in Amazon AppFlow to load the data from Salesforce to Snowflake. Complete the following steps:

  1. On the Amazon AppFlow console, choose Flows in the navigation pane.
  2. Choose Create flow.
  3. On the Specify flow details page, enter a name for the flow (for example, AccountData-SalesforceToSnowflake).
  4. Optionally, provide a description for the flow and tags.
  5. Choose Next.

  1. On the Configure flow page, for Source name¸ choose Salesforce.
  2. Choose the Salesforce connection we created in the previous step (Salesforce-blog).
  3. For Choose Salesforce object, choose Account.
  4. For Destination name, choose Snowflake.
  5. Choose the newly created Snowflake connection.
  6. For Choose Snowflake object, choose the staging table you created earlier (SALESTEST.PUBLIC. ACCOUNT_STAGING).

  1. In the Error handling section, provide your error S3 bucket.
  2. For Choose how to trigger the flow¸ select Run on demand.
  3. Choose Next.

  1. Select Manually map fields to map the fields between your source and destination.
  2. Choose the fields Account Number, Account Name, Account Type, Annual Revenue, Active, Deleted, and Last Modified Date.

  1. Map each source field to its corresponding destination field.
  2. Under Additional settings, leave the Import deleted records unchecked (default setting).

  1. In the Validations section, add validations for the data you’re pulling from Salesforce.

Because the schema for the Account_Staging table in Snowflake database has a NOT NULL constraint for the fields Account_Number and Active, records containing a null value for these fields should be ignored.

  1. Choose Add Validation to configure validations for these fields.
  2. Choose Next.

  1. Leave everything else as default, proceed to the final page, and choose Create Flow.
  2. After the flow is created, choose Run flow.

When the flow run completes successfully, it will bring all records into your Snowflake staging table.

Verify data in Snowflake

The data will be loaded into the Account_staging table. To verify that data is loaded in Snowflake, complete the following steps:

  1. Validate the number of records by querying the ACCOUNT_STAGING table in Snowflake.
  2. Wait for your Snowflake task to run based on the configured schedule.
  3. Verify that all the data is transferred to the ACCOUNT table and the ACCOUNT_STAGING table is truncated.

Configure an incremental data load from Salesforce

Now let’s configure an incremental data load from Salesforce:

  1. On the Amazon AppFlow console, select your flow, and choose Edit.
  2. Go to the Edit configuration step and change to Run flow on schedule.
  3. Set the flow to run every 5 minutes, and provide a start date of Today, with a start time in the future.
  4. Choose Incremental transfer and choose the LastModifiedDate field.
  5. Choose Next.
  6. In the Additional settings section, select Import deleted records.

This ensures that deleted records from the source are also ingested.

  1. Choose Save and then choose Activate flow.

Now your flow is configured to capture all incremental changes.

Test the solution

Log in to your Salesforce account, and edit any record in the Account object.

Within 5 minutes or less, a scheduled flow will pick up your change and write the changed record into your Snowflake staging table and trigger the synchronization process.

You can see the details of the run, including number of records transferred, on the Run History tab of your flow.

Clean up

Clean up the resources in your AWS account by completing the following steps:

  1. On the Amazon AppFlow console, choose Flows in the navigation pane.
  2. From the list of flows, select the flow AccountData-SalesforceToSnowflakeand delete it.
  3. Enter delete to delete the flow.
  4. Choose Connections in the navigation pane.
  5. Choose Salesforce from the list of connectors, select Salesforce-blog, and delete it.
  6. Enter delete to delete the connector.
  7. On the Connections page, choose Snowflake from the list of connectors, select Snowflake-blog, and delete it.
  8. Enter delete to delete the connector.
  9. On the IAM console, choose Roles in the navigation page, then select the role you created for Snowflake and delete it.
  10. Choose Policies in the navigation pane, select the policy you created for Snowflake, and delete it.
  11. On the Amazon S3 console, search for the data bucket you created, choose Empty to delete the objects, then delete the bucket.
  12. Search for the error bucket you created, choose Empty to delete the objects, then delete the bucket.
  13. Clean up resources in your Snowflake account:
  • Delete the task TASK_ACCOUNT_LOAD:
ALTER TASK TASK_ACCOUNT_LOAD SUSPEND;
DROP TASK TASK_ACCOUNT_LOAD;
  • Delete the stored procedure sp_account_load:
DROP procedure sp_account_load();
  • Delete the tables ACCOUNT_STAGING and ACCOUNT:
DROP TABLE ACCOUNT_STAGING;
DROP TABLE ACCOUNT;

Conclusion

In this post, we walked you through how to integrate and synchronize your data from Salesforce to Snowflake using Amazon AppFlow. This demonstrates how you can set up your ETL jobs without having to learn new programming languages by using Amazon AppFlow and your familiar SQL language. This is a proof of concept, but you can try to handle edge cases like failure of Snowflake tasks or understand how incremental transfer works by making multiple changes to a Salesforce record within the scheduled time interval.

For more information on Amazon AppFlow, visit Amazon AppFlow.


About the authors

Ramesh Ranganathan is a Senior Partner Solution Architect at AWS. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, application modernization and cloud native development. He is passionate about technology and enjoys experimenting with AWS Serverless services.

Kamen Sharlandjiev is an Analytics Specialist Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Amit Shah is a cloud based modern data architecture expert and currently leading AWS Data Analytics practice in Atos. Based in Pune in India, he has 20+ years of experience in data strategy, architecture, design and development. He is on a mission to help organization become data-driven.

New — Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/new-amazon-sagemaker-data-wrangler-supports-saas-applications-as-data-sources/

Data fuels machine learning. In machine learning, data preparation is the process of transforming raw data into a format that is suitable for further processing and analysis. The common process for data preparation starts with collecting data, then cleaning it, labeling it, and finally validating and visualizing it. Getting the data right with high quality can often be a complex and time-consuming process.

This is why customers who build machine learning (ML) workloads on AWS appreciate the ability of Amazon SageMaker Data Wrangler. With SageMaker Data Wrangler, customers can simplify the process of data preparation and complete the required processes of the data preparation workflow on a single visual interface. Amazon SageMaker Data Wrangler helps to reduce the time it takes to aggregate and prepare data for ML.

However, due to the proliferation of data, customers generally have data spread out into multiple systems, including external software-as-a-service (SaaS) applications like SAP OData for manufacturing data, Salesforce for customer pipeline, and Google Analytics for web application data. To solve business problems using ML, customers have to bring all of these data sources together. They currently have to build their own solution or use third-party solutions to ingest data into Amazon S3 or Amazon Redshift. These solutions can be complex to set up and not cost-effective.

Introducing Amazon SageMaker Data Wrangler Supports SaaS Applications as Data Sources
I’m happy to share that starting today, you can aggregate external SaaS application data for ML in Amazon SageMaker Data Wrangler to prepare data for ML. With this feature, you can use more than 40 SaaS applications as data sources via Amazon AppFlow and have these data available on Amazon SageMaker Data Wrangler. Once the data sources are registered in AWS Glue Data Catalog by AppFlow, you can browse tables and schemas from these data sources using Data Wrangler SQL explorer. This feature provides seamless data integration between SaaS applications and SageMaker Data Wrangler using Amazon AppFlow.

Here is a quick preview of this new feature:

This new feature of Amazon SageMaker Data Wrangler works by using integration with Amazon AppFlow, a fully managed integration service that enables you to securely exchange data between SaaS applications and AWS services. With Amazon AppFlow, you can establish bidirectional data integration between SaaS applications, such as Salesforce, SAP, and Amplitude and all supported services, into your Amazon S3 or Amazon Redshift.

Then, with Amazon AppFlow, you can catalog the data in AWS Glue Data Catalog. This is a new feature where with Amazon AppFlow, you can create an integration with AWS Glue Data Catalog for Amazon S3 destination connector. With this new integration, customers can catalog SaaS data applications into AWS Glue Data Catalog with a few clicks, directly from the Amazon AppFlow Flow configuration, without the need to run any crawlers.

Once you’ve established a flow and inserted it into the AWS Glue Data Catalog, you can use this data inside the Amazon SageMaker Data Wrangler. Then, you can do the data preparation as you usually do. You can write Amazon Athena queries to preview data, join data from multiple sources, or import data to prepare for ML model training.

With this feature, you need to do a few simple steps to perform seamless data integration between SaaS applications into Amazon SageMaker Data Wrangler via Amazon AppFlow. This integration supports more than 40 SaaS applications, and for a complete list of supported applications, please check the Supported source and destination applications documentation.

Get Started with Amazon SageMaker Data Wrangler Support for Amazon AppFlow
Let’s see how this feature works in detail. In my scenario, I need to get data from Salesforce, and do the data preparation using Amazon SageMaker Data Wrangler.

To start using this feature, the first thing I need to do is to create a flow in Amazon AppFlow that registers the data source into the AWS Glue Data Catalog. I already have an existing connection with my Salesforce account, and all I need now is to create a flow.

One important thing to note is that to make SaaS application data available in Amazon SageMaker Data Wrangler, I need to create a flow with Amazon S3 as the destination. Then, I need to enable Create a Data Catalog table in the AWS Glue Data Catalog settings. This option will automatically catalog my Salesforce data into AWS Glue Data Catalog.

On this page, I need to select a user role with the required AWS Glue Data Catalog permissions and define the database name and the table name prefix. In addition, in this section, I can define the data format preference, be it in JSON, CSV, or Apache Parquet formats, and filename preference if I want to add a timestamp into the file name section.

To learn more about how to register SaaS data in Amazon AppFlow and AWS Glue Data Catalog, you can read Cataloging the data output from an Amazon AppFlow flow documentation page.

Once I’ve finished registering SaaS data, I need to make sure the IAM role can view the data sources in Data Wrangler from AppFlow. Here is an example of a policy in the IAM role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "glue:SearchTables",
            "Resource": [
                "arn:aws:glue:*:*:table/*/*",
                "arn:aws:glue:*:*:database/*",
                "arn:aws:glue:*:*:catalog"
            ]
        }
    ]
} 

By enabling data cataloging with AWS Glue Data Catalog, from this point on, Amazon SageMaker Data Wrangler will be able to automatically discover this new data source and I can browse tables and schema using the Data Wrangler SQL Explorer.

Now it’s time to switch to the Amazon SageMaker Data Wrangler dashboard then select Connect to data sources.

On the following page, I need to Create connection and select the data source I want to import. In this section, I can see all the available connections for me to use. Here I see the Salesforce connection is already available for me to use.

If I would like to add additional data sources, I can see a list of external SaaS applications that I can integrate into the Set up new data sources section. To learn how to recognize external SaaS applications as data sources, I can learn more with the select How to enable access.

Now I will import datasets and select the Salesforce connection.

On the next page, I can define connection settings and import data from Salesforce. When I’m done with this configuration, I select Connect.

On the following page, I see my Salesforce data that I already configured with Amazon AppFlow and AWS Glue Data Catalog called appflowdatasourcedb. I can also see a table preview and schema for me to review if this is the data I need.

Then, I start building my dataset using this data by performing SQL queries inside the SageMaker Data Wrangler SQL Explorer. Then, I select Import query.

Then, I define a name for my dataset.

At this point, I can start doing the data preparation process. I can navigate to the Analysis tab to run the data insight report. The analysis will provide me with a report on the data quality issues and what transform I need to use next to fix the issues based on the ML problem I want to predict. To learn more about how to use the data analysis feature, see Accelerate data preparation with data quality and insights in the Amazon SageMaker Data Wrangler blog post.

In my case, there are several columns I don’t need, and I need to drop these columns. I select Add step.

One feature I like is that Amazon SageMaker Data Wrangler provides numerous ML data transforms. It helps me to streamline the process of cleaning, transforming and feature engineering my data in one dashboard. For more about what SageMaker Data Wrangler provides for transformation data, please read this Transform Data documentation page.

In this list, I select Manage columns.

Then, in the Transform section, I select the Drop column option. Then, I select a few columns that I don’t need.

Once I’m done, the columns I don’t need are removed and the Drop column data preparation step I just created is listed in the Add step section.

I can also see the visual of my data flow inside the Amazon SageMaker Data Wrangler. In this example, my data flow is quite basic. But when my data preparation process becomes complex, this visual view makes it easy for me to see all the data preparation steps.

From this point on, I can do what I require with my Salesforce data. For example, I can export data directly to Amazon S3 by selecting Export to and choosing Amazon S3 from the Add destination menu. In my case, I specify Data Wrangler to store the data in Amazon S3 after it has processed it by selecting Add destination and then Amazon S3.

Amazon SageMaker Data Wrangler provides me flexibility to automate the same data preparation flow using scheduled jobs. I can also automate feature engineering with SageMaker Pipelines (via Jupyter Notebook) and SageMaker Feature Store (via Jupyter Notebook), and deploy to Inference end point with SageMaker Inference Pipeline (via Jupyter Notebook).

Things to Know
Related news – This feature will make it easy for you to do data aggregation and preparation with Amazon SageMaker Data Wrangler. As this feature is an integration with Amazon AppFlow and also AWS Glue Data Catalog, you might want to learn more on Amazon AppFlow now supports AWS Glue Data Catalog integration and provides enhanced data preparation page.

Availability – Amazon SageMaker Data Wrangler supports SaaS applications as data sources available in all the Regions currently supported by Amazon AppFlow.

Pricing – There is no additional cost to use SaaS applications supports in Amazon SageMaker Data Wrangler, but there is a cost to running Amazon AppFlow to get the data in Amazon SageMaker Data Wrangler.

Visit Import Data From Software as a Service (SaaS) Platforms documentation page to learn more about this feature, and follow the getting started guide to start data aggregating and preparing SaaS applications data with Amazon SageMaker Data Wrangler.

Happy building!
Donnie

Announcing Additional Data Connectors for Amazon AppFlow

Post Syndicated from Steve Roberts original https://aws.amazon.com/blogs/aws/announcing-additional-data-connectors-for-amazon-appflow/

Gathering insights from data is a more effective process if that data isn’t fragmented across multiple systems and data stores, whether on premises or in the cloud. Amazon AppFlow provides bidirectional data integration between on-premises systems and applications, SaaS applications, and AWS services. It helps customers break down data silos using a low- or no-code, cost-effective solution that’s easy to reconfigure in minutes as business needs change.

Today, we’re pleased to announce the addition of 22 new data connectors for Amazon AppFlow, including:

  • Marketing connectors (e.g., Facebook Ads, Google Ads, Instagram Ads, LinkedIn Ads).
  • Connectors for customer service and engagement (e.g., MailChimp, Sendgrid, Zendesk Sell or Chat, and more).
  • Business operations (Stripe, QuickBooks Online, and GitHub).

In total, Amazon AppFlow now supports over 50 integrations with various different SaaS applications and AWS services. This growing set of connectors can be combined to enable you to achieve 360 visibility across the data your organization generates. For instance, you could combine CRM (Salesforce), e-commerce (Stripe), and customer service (ServiceNow, Zendesk) data to build integrated analytics and predictive modeling that can guide your next best offer decisions and more. Using web (Google Analytics v4) and social surfaces (Facebook Ads, Instagram Ads) allows you to build comprehensive reporting for your marketing investments, helping you understand how customers are engaging with your brand. Or, sync ERP data (SAP S/4HANA) with custom order management applications running on AWS. For more information on the current range of connectors and integrations, visit the Amazon AppFlow integrations page.

Datasource connectors for Amazon AppFlow

Amazon AppFlow and AWS Glue Data Catalog
Amazon AppFlow has also recently announced integration with the AWS Glue Data Catalog to automate the preparation and registration of your SaaS data into the AWS Glue Data Catalog. Previously, customers using Amazon AppFlow to store data from supported SaaS applications into Amazon Simple Storage Service (Amazon S3) had to manually create and run AWS Glue Crawlers to make their data available to other AWS services such as Amazon Athena, Amazon SageMaker, or Amazon QuickSight. With this new integration, you can populate AWS Glue Data Catalog with a few clicks directly from the Amazon AppFlow configuration without the need to run any crawlers.

To simplify data preparation and improve query performance when using analytics engines such as Amazon Athena, Amazon AppFlow also now enables you to organize your data into partitioned folders in Amazon S3. Amazon AppFlow also automates the aggregation of records into files that are optimized to the size you specify. This increases performance by reducing processing overhead and improving parallelism.

You can find more information on the AWS Glue Data Catalog integration in the recent What’s New post.

Getting Started with Amazon AppFlow
Visit the Amazon AppFlow product page to learn more about the service and view all the available integrations. To help you get started, there’s also a variety of videos and demos available and some sample integrations available on GitHub. And finally, should you need a custom integration, try the Amazon AppFlow Connector SDK, detailed in the Amazon AppFlow documentation. The SDK enables you to build your own connectors to securely transfer data between your custom endpoint, application, or other cloud service to and from Amazon AppFlow‘s library of managed SaaS and AWS connectors.

— Steve

AWS Week in Review – June 20, 2022

Post Syndicated from Steve Roberts original https://aws.amazon.com/blogs/aws/aws-week-in-review-june-20-2022/

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

Last Week’s Launches
It’s been a quiet week on the AWS News Blog, however a glance at What’s New page shows the various service teams have been busy as usual. Here’s a round-up of announcements that caught my attention this past week.

Support for 15 new resource types in AWS Config – AWS Config is a service for assessment, audit, and evaluation of the configuration of resources in your account. You can monitor and review changes in resource configuration using automation against a desired configuration. The newly expanded set of types includes resources from Amazon SageMaker, Elastic Load Balancing, AWS Batch, AWS Step Functions, AWS Identity and Access Management (IAM), and more.

New console experience for AWS Budgets – A new split-view panel allows for viewing details of a budget without needing to leave the overview page. The new panel will save you time (and clicks!) when you’re analyzing performance across a set of budgets. By the way, you can also now select multiple budgets at the same time.

VPC endpoint support is now available in Amazon SageMaker Canvas SageMaker Canvas is a visual point-and-click service enabling business analysts to generate accurate machine-learning (ML) models without requiring ML experience or needing to write code. The new VPC endpoint support, available in all Regions where SageMaker Canvas is suppported, eliminates the need for an internet gateway, NAT instance, or a VPN connection when connecting from your SageMaker Canvas environment to services such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and more.

Additional data sources for Amazon AppFlow – Facebook Ads, Google Ads, and Mixpanel are now supported as data sources, providing the ability to ingest marketing and product analytics for downstream analysis in AppFlow-connected software-as-a-service (SaaS) applications such as Marketo and Salesforce Marketing Cloud.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Other AWS News
Some other updates you may have missed from the past week:

Amazon Elastic Compute Cloud (Amazon EC2) expanded the Regional availability of AWS Nitro System-based C6 instance types. C6gn instance types, powered by Arm-based AWS Graviton2 processors, are now available in the Asia Pacific (Seoul), Europe (Milan), Europe (Paris), and Middle East (Bahrain) Regions, while C6i instance types, powered by 3rd generation Intel Xeon Scalable processors, are now available in the Europe (Frankfurt) Region.

As a .NET and PowerShell Developer Advocate here at AWS, there are some news and updates related to .NET I want to highlight:

Upcoming AWS Events
The AWS New York Summit is approaching quickly, on July 12. Registration is also now open for the AWS Summit Canberra, an in-person event scheduled for August 31.

Microsoft SQL Server users may be interested in registering for the SQL Server Database Modernization webinar on June 21. The webinar will show you how to go about modernizing and how to cost-optimize SQL Server on AWS.

Amazon re:MARS is taking place this week in Las Vegas. I’ll be there as a host of the AWS on Air show, along with special guests highlighting their latest news from the conference. I also have some On Air sessions on using our AI services from .NET lined up! As usual, we’ll be streaming live from the expo hall, so if you’re at the conference, give us a wave. You can watch the show live on Twitch.tv/aws, Twitter.com/AWSOnAir, and LinkedIn Live.

A reminder that if you’re a podcast listener, check out the official AWS Podcast Update Show. There is also the latest installment of the AWS Open Source News and Updates newsletter to help keep you up to date.

No doubt there’ll be a whole new batch of releases and announcements from re:MARS, so be sure to check back next Monday for a summary of the announcements that caught our attention!

— Steve

Combining Amazon AppFlow with AWS Step Functions to maximize application integration benefits

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/combining-amazon-appflow-with-aws-step-functions-to-maximize-application-integration-benefits/

This post is written by Ahmad Aboushady, Senior Technical Account Manager and Kamen Sharlandjiev, Senior Specialist Solution Architect, Integration.

In this blog post, you learn how to orchestrate AWS service integrations to reduce the manual steps in your workflow. The example uses AWS Step Functions SDK integration to integrate Amazon AppFlow and AWS Glue catalog without writing custom code. It automatically uses Amazon EventBridge to trigger Step Functions every time a new Amazon AppFlow flow finishes running.

Amazon AppFlow enables customers to transfer data securely between software as a service (SaaS) applications, like Salesforce, SAP, Zendesk, Slack, ServiceNow, and multiple AWS services.

An everyday use case of Amazon AppFlow is creating a customer-360 by integrating marketing, customer support, and sales data. For example, analyze the revenue impact of different marketing channels by synchronizing the revenue data from Salesforce with marketing data from Adobe Marketo.

This involves setting up flows to ingest data from different data sources and SaaS applications to AWS Data Lake based on Amazon S3. It uses AWS Glue to crawl and catalog this data. Customers use this catalog to access data quickly in several ways.

For example, they query the data using Amazon Athena or Amazon QuickSight for visualizations, business intelligence and anomaly detection. You can create those data flows quickly with no code required. However, to complete the next set of requirements, customers often go through multiple manual steps of provisioning and configuring different AWS resources. One such step requires creating AWS Glue crawler and running it with every Amazon AppFlow flow execution.

Step Functions can help us automate this process. This is a low-code workflow orchestration service that offers a visual workflow designer. You can quickly build workflows using the built-in drag-and-drop interface available in the AWS Management Console.

You can follow this blog and build your end-to-end state machine using the Step Functions Workflow Studio, or use the AWS Serverless Application Model (AWS SAM) template to deploy the example. The Step Functions state machine uses SDK integration with other AWS Services, so you don’t need to write any custom integration code.

Overview

The following diagram depicts the workflow with the different states in the state machine. You can group these into three phases: preparation, processing, and configuration.

  • The preparation phase captures all the configuration parameters and collects information about the metadata of the data, ingested by Amazon AppFlow.
  • The processing phase generates the AWS Glue table definition and sets the required parameters based on the destination file type. It iterates through the different columns and adds them as part of the table definition.
  • The last phase provides the Glue Catalog resources by creating or updating an existing AWS Glue table. With each Amazon AppFlow flow execution, the state machine determines if a new Glue table partition is required.

Workflow architecture

Preparation phase

The first state, “SetDatabaseAndContext”, is a pass state where you set the configuration parameters used in later states. Set the AWS Glue database and table name and capture the details of the data flow. You can do this by using the parameters filter to build a new JSON payload using parts of the state input similar to:

"Parameters": {
        "Config": {
          "Database": "<Glue-Database-Name>",
          "TableName.$": "$.detail['flow-name']",
          "detail.$": "$.detail"
        }
}

The following state, “DatabaseExist?” is an AWS SDK integration using a “GetDatabase” call to AWS Glue to ensure that the database exists. Here, the state uses error handling to intercept exception messages from the SDK call. This feature splits the workflow and adds an extra step if needed.

In this case, the SDK call returns an exception if the database does not exist, and the workflow invokes the “CreateDatabase” state. It moves to the “CleanUpError” state to clean up any errors and set the configuration parameters accordingly. Afterwards, with the database in place, the workflow continues to the next state: “DescribeFlow”. This returns the metadata of the Amazon AppFlow flow. Part of this metadata is the list of the object fields, which you must create in the Glue table and partitions.

Here is an error handling state that catches exceptions and routes the flow to execute an extra step:

"Catch": [
  {
    "ErrorEquals": [
      "States.ALL"
    ],
    "Comment": "Create Glue Database",
    "Next": "CreateDatabase",
    "ResultPath": "$.error"
  }
]

In the next state, “DescribeFlow”, you use the AWS SDK integration to get the Amazon AppFlow flow configuration. This uses the Amazon AppFlow “DescribeFlow API call. It moves to “S3AsDestination?”, which is a choice state to check if S3 is a destination for the flow. Amazon AppFlow allows you to bring data into different purpose-built data stores, such as S3, Amazon Redshift, or external SaaS or data warehouse applications. This automation can only continue if the configured destination is S3.

The choice definition is:

"Choices": [
  {
    "Variable": "$.FlowConfig.DestinationFlowConfigList[0].ConnectorType",
    "StringEquals": "S3",
    "Next": "GenerateTableDefinition"
  }
],
"Default": "S3NotDestination"

Processing phase

The following state generates the base AWS Glue table definition based on the destination file type. Then it uses a map state to iterate and transform the Amazon AppFlow schema output into what the AWS Glue Data Catalog expects as input.

Next, add the “GenerateTableDefinition” state and use the parameters filter to build a new JSON payload output. Finally, use the information from the “DescribeFlow” state similar to:

"Parameters": {
  "Config.$": "$.Config",
  "FlowConfig.$": "$.FlowConfig",
  "TableInput": {
    "Description": "Created by AmazonAppFlow",
    "Name.$": "$.Config.TableName",
    "PartitionKeys": [
      {
        "Name": "partition_0",
        "Type": "string"
      }
    ],
    "Retention": 0,
    "Parameters": {
      "compressionType": "none",
      "classification.$": "$.FlowConfig.DestinationFlowConfigList[0].DestinationConnectorProperties['S3'].S3OutputFormatConfig.FileType",
      "typeOfData": "file"
    },
    "StorageDescriptor": {
      "BucketColumns": [],
      "Columns.$": "$.FlowConfig.Tasks[?(@.TaskType == 'Map')]",
      "Compressed": false,
      "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
      "Location.$": "States.Format('{}/{}/', $.Config.detail['destination-object'], $.FlowConfig.FlowName)",
      "NumberOfBuckets": -1,
      "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
      "SortColumns": [],
      "StoredAsSubDirectories": false
    },
    "TableType": "EXTERNAL_TABLE"
  }
}

The following state, “DestinationFileFormatEvaluator”, is a choice state to change the JSON payload according to the destination file type. Amazon AppFlow supports different file type conversions when S3 is the destination of your data. These formats are CSV, Parquet, and JSON Lines. AWS Glue uses various serialization libraries according to the file type.

You iterate within a map state to transform the AWS Glue table schema and set the column type to a known AWS Glue format. If the file type is unrecognized or does not have an equivalent in Glue, default this field to string. The map state configuration is defined as:

"Iterator": {
        "StartAt": "KnownFIleFormat?",
        "States": {
          "KnownFIleFormat?": {
            "Type": "Choice",
            "Choices": [
              {
                "Or": [
                  {
                    "Variable": "$.TaskProperties.SOURCE_DATA_TYPE",
                    "StringEquals": "boolean"
                  },
                  {
                    "Variable": "$.TaskProperties.SOURCE_DATA_TYPE",
                    "StringEquals": "double"
                  },
                  .
                  .
                  .
                  .
                  {
                    "Variable": "$.TaskProperties.SOURCE_DATA_TYPE",
                    "StringEquals": "timestamp"
                  }
                ],
                "Next": "1:1 mapping"
              }
            ],
            "Default": "Cast to String"
          },
          "1:1 mapping": {
            "Type": "Pass",
            "End": true,
            "Parameters": {
              "Name.$": "$.DestinationField",
              "Type.$": "$.TaskProperties.SOURCE_DATA_TYPE"
            }
          },
          "Cast to String": {
            "Type": "Pass",
            "End": true,
            "Parameters": {
              "Name.$": "$.DestinationField",
              "Type": "string"
            }
          }
        }
      },
"ItemsPath": "$.TableInput.StorageDescriptor.Columns",
"ResultPath": "$.TableInput.StorageDescriptor.Columns",

Configuration phase

The next stage in the workflow is “TableExist?”, which checks if the Glue table exists. If the state machine detects any error because the table does not exist, it moves to the “CreateTable” state. Alternatively, it goes to the “UpdateTable” state.

Both states use the AWS SDK integration to create or update the AWS Glue table definition using the “TableInput” parameter. AWS Glue operates with partitions. Every time you have new data stored in a new S3 prefix, you must update the table and add a new partition showing where the data sits.

You need an extra step to check if Amazon AppFlow has stored the data into a new S3 prefix or an existing one. In the “AddPartition?” State, you must review and determine the next step of your workflow. For example, you must validate that the flow executed successfully and processed data.

A choice state helps with those checks:

"And": [
            {
              "Variable": "$.Config.detail['execution-id']",
              "IsPresent": true
            },
            {
              "Variable": "$.Config.detail['status']",
              "StringEquals": "Execution Successful"
            },
            {
              "Not": {
                "Variable": "$.Config.detail['num-of-records-processed']",
                "StringEquals": "0"
              }
            }
          ]

Amazon AppFlow supports different types of flow execution. With scheduled flows, you can regularly configure Amazon AppFlow to hydrate a data lake by bringing only new data since its last execution. Sometimes, after a successful flow execution, there is no new data to ingest. The workflow concludes and moves to the success state in such cases. However, if there is new data, the state machine continues to the next state, “SingleFileAggregation?”.

Amazon AppFlow supports different file aggregation strategies and allows you to aggregate all ingested records into a single or multiple files. Depending on your flow configuration, it may store your data in a different S3 prefix with each flow execution.

In this state, you check this configuration to decide if you need a new partition for your AWS Glue table.

"Variable": "$.FlowConfig.DestinationFlowConfigList[0].DestinationConnectorProperties.S3.S3OutputFormatConfig.AggregationConfig.AggregationType",
"StringEquals": "SingleFile"

If the data flow aggregates all records into a single file per flow execution, it stores all data into a single S3 prefix. In this case, there is a single partition in your AWS Glue table. You must create that single partition the first time this state machine executes for a specific flow.

Use the AWS SDK integration to get the table partition from the AWS Glue in the “IsPartitionExist?” state. Conclude the workflow and move to the “Success” state if the partition exists. Otherwise, create that single partition in another state, “CreateMainPartition”.

If the flow run does not aggregate files, every flow run generates multiple files into a new S3 prefix. In this case, you add a new partition to the AWS Glue table. A pass state, “ConfigureDestination”, configures the required parameters for the partition creation:

"Parameters": {
        "InputFormat.$": "$.TableInput.StorageDescriptor.InputFormat",
        "OutputFormat.$": "$.TableInput.StorageDescriptor.OutputFormat",
        "Columns.$": "$.TableInput.StorageDescriptor.Columns",
        "Compressed.$": "$.TableInput.StorageDescriptor.Compressed",
        "SerdeInfo.$": "$.TableInput.StorageDescriptor.SerdeInfo",
        "Location.$": "States.Format('{}{}', $.TableInput.StorageDescriptor.Location, $.Config.detail['execution-id'])"
      },
 "ResultPath": "$.TableInput.StorageDescriptor"

Next, move to the “CreateNewPartition” state to use the AWS SDK integration to create a new partition to the Glue table similar to:

"Parameters": {
        "DatabaseName.$": "$.Config.Database",
        "TableName.$": "$.Config.TableName",
        "PartitionInput": {
          "Values.$": "States.Array($.Config.detail['execution-id'])",
          "StorageDescriptor.$": "$.TableInput.StorageDescriptor"
        }
      },
"Resource": "arn:aws:states:::aws-sdk:glue:createPartition"

This concludes the workflow with a “Succeed” state after configuring the AWS Glue table in response to the new Amazon AppFlow flow run.

Conclusion

This blog post explores how to integrate Amazon AppFlow and AWS Glue using Step Functions to automate your business requirements. You can use AWS Lambda to simplify the configuration phase and reduce state transitions or create complex checks, filters, or even data cleansing and preparation.

This approach allows you to tailor the schema conversion to your business requirements. Use this AWS SAM template, to deploy this example. This provides the Step Functions workflow described in this post and the EventBridge rule to trigger the state machine after each Amazon AppFlow flow run. The template also includes all required IAM roles and permissions.

For more serverless learning resources, visit Serverless Land.

Building custom connectors using the Amazon AppFlow Custom Connector SDK

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-custom-connectors-using-the-amazon-appflow-custom-connector-sdk/

This post is written by Kamen Sharlandjiev, Sr. Specialist SA, Integration, Ray Jang, Principal PMT, Amazon AppFlow, and Dhiraj Mahapatro, Sr. Specialist SA, Serverless.

Amazon AppFlow is a fully managed integration service that enables you to transfer data securely between software as a service (SaaS) applications like Salesforce, SAP, Zendesk, Slack, ServiceNow, and AWS services like Amazon S3 and Amazon Redshift. Amazon AppFlow lets you run enterprise-scale data flows on a schedule, in response to business events, or on-demand.

Overview diagram

Amazon AppFlow is a managed integration service that replaces the heavy-lifting of developing, maintaining, and updating connectors. It supports bidirectional integration between external SaaS applications and AWS services.

The Custom Connector Software Development Kit (SDK) now makes it easier to integrate with private API endpoints, proprietary applications, or other cloud services. It provides access to all available managed integrations and the ability to build your own custom integration as part of the integrated experience. The SDK is open-source and available for Java or Python.

You can deploy custom connectors built with the SDK in different ways:

  • Private – The connector is available only inside the AWS account where deployed.
  • Shared – The connector can be shared for use with other AWS accounts.
  • Public – Publish connectors on the AWS Marketplace for free or charge a subscription fee. For more information, refer to Sharing AppFlow connectors via AWS Marketplace.

Overview

This blog takes you through building and deploying your own Amazon AppFlow Custom Connector using the Java SDK. The sample application shows how to build your first custom connector with Amazon AppFlow.

Custom connector flow

The process of building, deploying, and using a custom connector is:

  1. Create a custom connector as an AWS Lambda function using the Amazon AppFlow Custom Connector SDK.
  2. Deploy the custom connector Lambda function, which provides the serverless compute for the connector.
  3. Lambda function integrates with a SaaS application or private API.
  4. Register the custom connector with Amazon AppFlow.
  5. Users can now use this custom connector in the Amazon AppFlow service.

Building an Amazon AppFlow custom connector

The sample application used in this blog creates a new custom connector that implements a MySQL JDBC driver. With this connector, you can connect to a remote MySQL or MariaDB instance to read and write data.

The SDK allows you to build custom connectors and use the service’s built-in authentication support for: OAuth2, API key, and basic auth. For other use cases, such as JDBC, you must create your own custom authentication implementation.

The SDK includes the source code for an example Salesforce connector. This highlights a complete use case for a source and destination Amazon AppFlow connector using OAuth2 as authentication.

Details

There are three mandatory Java interfaces that a connector must implement:

  1. ConfigurationHandler.java: Defines the functionality to implement connector configurations, and credentials-related operations.
  2. MetadataHandler.java: Represents the functionality to implement for objects metadata.
  3. RecordHandler.java: Defines functionality to implement record-related CRUD operations.

Prerequisites

Ensure that the following software is installed on your workstation:

  1. Java 11
  2. Maven
  3. AWS CLI
  4. AWS SAM CLI

To run the sample application:

  1. Clone the code repository:
    git clone https://github.com/aws-samples/amazon-appflow-custom-jdbc-connector.git
    
    cd amazon-appflow-custom-jdbc-connector
  2. After cloning the sample application, visit these Java classes for more details:

To add JDBC clients for other database engines, implement JDBCClient.java interface. The custom connector uses a Lambda function as a POJO class to handle requests. The SDK provides an abstract BaseLambdaConnectorHandler class that, which you use as follows:

import com.amazonaws.appflow.custom.connector.lambda.handler.BaseLambdaConnectorHandler;

public class JDBCConnectorLambdaHandler extends BaseLambdaConnectorHandler {

  public JDBCConnectorLambdaHandler() {
    super(
      new JDBCConnectorMetadataHandler(),
      new JDBCConnectorRecordHandler(),
      new JDBCConnectorConfigurationHandler()
    );
  }
}

Local testing and debugging

While developing the connector specific functionality, developers require local testing capability to build and debug faster. The SDK and the example connector provides examples on testing custom connectors.

Additionally, you can experiment with JUnit and the DSL builders provided by the SDK. The JUnit test allows you to test this implementation locally by simulating an appropriate request to the Lambda functions. You can use debug points and step into the code implementation from start to end using the built-in IDE debugger. The sample application comes with example of JUnit tests that can be used with debug points.

Credentials management 

Amazon AppFlow stores all sensitive information in AWS Secrets Manager. The secret is created when you create a connector profile. The secret ARN is passed in the ConnectorContext that forms part of the Lambda function’s invocation request.

To test locally:

  • Mock the “CredentialsProvider” and stub out the response of GetCredentials API. Note that the CredentialProvider provides several different GetCredentials methods, depending on the authentication used.
  • Create a secret in AWS Secrets Manager. Configure an IAM user with programmatic access and sufficient permissions to allow the secretsmanager:GetSecretValue action and let the CredentialsProvider call Secrets Manager locally. When you initialize a new service client without supplying any arguments, the SDK attempts to find AWS credentials by using the default credential provider chain.

For more information, read Working with AWS Credentials (SDK for Java) and Creating an IAM user with programmatic access.

Deploying the Lambda function in an AWS account

This example connector package provides an AWS Serverless Application Model (AWS SAM) template in the project folder. It describes the following resources:

  1. The Lambda function containing the custom connector code.
  2. The AWS IAM policy, allowing the function to read secrets from AWS Secrets Manager.
  3. The AWS Lambda policy permission allowing Amazon AppFlow to invoke the Lambda function.

The sample application’s AWS SAM template provides two resources:

AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: Template to deploy the lambda connector in your account.
Resources:
  ConnectorFunction:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: "org.custom.connector.jdbc.handler.JDBCConnectorLambdaHandler::handleRequest"
      CodeUri: "./target/appflow-custom-jdbc-connector-jdbc-1.0.jar"
      Description: "AppFlow custom JDBC connector example"
      Runtime: java11
      Timeout: 30
      MemorySize: 1024
      Policies:
        Version: '2012-10-17'
        Statement:
          Effect: Allow
          Action: 'secretsmanager:GetSecretValue'
          Resource: !Sub 'arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:appflow!${AWS::AccountId}-*'

  PolicyPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      FunctionName: !GetAtt ConnectorFunction.Arn
      Action: lambda:InvokeFunction
      Principal: 'appflow.amazonaws.com'
      SourceAccount: !Ref 'AWS::AccountId'
      SourceArn: !Sub 'arn:aws:appflow:${AWS::Region}:${AWS::AccountId}:*'

Deploy this custom connector by using the following command from the amazon-appflow-custom-jdbc-connector base directory:

mvn package && sam deploy –-guided

Once deployment completes, follow below steps to register and use the connector.

Registering the custom connector

There are two ways to register the custom connector.

1. Register through the AWS Management Console

  1. From the AWS Management Console, navigate to Amazon AppFlow. Select Connectors on the left-side menu. Choose on the “Register New Connector” button.
  2. Register the connector by selecting your Lambda function and typing in the connector label.
    Register a new connectorThe newly created custom connector Lambda function appears in the list if you deployed using AWS SAM by following the steps in this tutorial. If you deployed the Lambda function manually, ensure that appropriate Lambda permissions are set, as described in the Lambda Permissions and Resource Policy section.
  3. Provide a label for the connector. The label must be unique per account per Region. Choose Register.
    Provide a label
  4. The connector appears in the list of custom connectors.
    List of connectors

2. Register with the API

Invoke the registerConnector public API endpoint with the following request payload:

{
   "connectorLabel":"TestCustomConnector",
   "connectorProvisioningType":"LAMBDA",
   "connectorProvisioningConfig":{
      "lambda":{ "lambdaArn":"arn:aws:lambda:<region>:<aws_account_id>:function:<lambdaFunctionName>"
      }
   }
}

For connectorLabel, use a unique label. Currently, the only supported connectorProvisioningType is LAMBDA.

Using the new custom connector

  1. Navigate to the Connections link from the left-menu. Select the registered connector from the drop-down.
    Selecting the connector
  2. Choose Create Connection.
    Choose Create Connection
  3. Complete the connector-specific setup:
    Setup page
  4. Proceed with creating a flow and selecting your new connection.
  5. Check Lambda function’s Amazon CloudWatch Logs to troubleshoot errors, if any, during connector registration, connector profile creation, and flow execution process.

Production considerations

This example is a proof of concept. To build a production-ready solution, review the non-exhaustive list of differences between sample and production-ready solutions.

If you plan to use the custom connector with high concurrency, review AWS Lambda quotas and limitations.

Cleaning up the custom connector stack

To delete the connector:

  1. Delete all flows in Amazon AppFlow that you created as part of this tutorial.
  2. Delete any connector profiles.
  3. Unregister the custom connector.
  4. To delete the stack, run the following command from the amazon-appflow-custom-jdbc-connector base directory:
    sam delete

Conclusion

This blog post shows how to extend the Amazon AppFlow service to move data between SaaS endpoints and custom APIs. You can now build custom connectors using the Amazon AppFlow Custom Connector SDK.

Using custom connectors in Amazon AppFlow allows you to integrate siloed applications with minimal code. For example, different business units using legacy applications in an organization can now integrate their services via the Amazon AppFlow Custom Connectors SDK.

Depending on your choice of framework you can use the open source Python SDK or Java SDK from GitHub. To learn more, refer to the Custom Connector SDK Developer Guide.

For more serverless learning resources, visit Serverless Land.

Build a modern data architecture on AWS with Amazon AppFlow, AWS Lake Formation, and Amazon Redshift.

Post Syndicated from Dr. Yannick Misteli original https://aws.amazon.com/blogs/big-data/build-a-modern-data-architecture-on-aws-with-amazon-appflow-aws-lake-formation-and-amazon-redshift/

This is a guest post written by Dr. Yannick Misteli, lead cloud platform and ML engineering in global product strategy (GPS) at Roche.

Recently the Roche Data Insights (RDI) initiative was launched to achieve our vision using new ways of working and collaboration in order to build shared, interoperable data & insights with federated governance. Furthermore, a simplified & integrated data landscape shall be established in order to empower insights communities. One of the first domains to engage in this program is the Go-to-Market (GTM) area which comprises sales, marketing, medical access and market affairs in Roche. GTM domain enables Roche to understand customers and to ultimately create and deliver valuable services that meet their needs. GTM as a domain extends beyond health care professionals (HCPs) to a larger healthcare ecosystem consisting of patients, communities, health authorities, payers, providers, academia, competitors, so on and so forth. Therefore, Data & Analytics are key in supporting the internal and external stakeholders in their decision-making processes through actionable insights.

Roche GTM built a modern data and machine learning (ML) platform on AWS while utilizing DevOps best practices. The mantra of everything as code (EaC) was key in building a fully automated, scalable data lake and data warehouse on AWS.

In this this post, you learn about how Roche used AWS products and services such as Amazon AppFlow, AWS Lake Formation, and Amazon Redshift to provision and populate their data lake; how they sourced, transformed, and loaded data into the data warehouse; and how they realized best practices in security and access control.

In the following sections, you dive deep into the scalable, secure, and automated modern data platform that Roche has built. We demonstrate how to automate data ingestion, security standards, and utilize DevOps best practices to ease management of your modern data platform on AWS.

Data platform architecture

The following diagram illustrates the data platform architecture.

The architecture contains the following components:

Lake Formation security

We use Lake Formation to secure all data as it lands in the data lake. Separating each data lake layer into distinct S3 buckets and prefixes enables fine-grained access control policies that Lake Formation implements. This concept also extends to locking down access to specific rows and columns and applying policies to specific IAM roles and users. Governance and access to data lake resources is difficult to manage, but Lake Formation simplifies this process for administrators.

To secure access to the data lake using Lake Formation, the following steps are automated using the AWS CDK with customized constructs:

  1. Register the S3 data buckets and prefixes, and corresponding AWS Glue databases with Lake Formation.
  2. Add data lake administrators (GitLab runner IAM deployment role and administrator IAM role).
  3. Grant the AWS Glue job IAM roles access to the specific AWS Glue databases.
  4. Grant the AWS Lambda IAM role access to the Amazon AppFlow databases.
  5. Grant the listed IAM roles access to the corresponding tables in the AWS Glue databases.

AWS Glue Data Catalog

The AWS Glue Data Catalog is the centralized registration and access point for all databases and tables that are created in both the data lake and in Amazon Redshift. This provides centralized transparency to all resources along with their schemas and the location of all data that is referenced. This is a critical aspect for any data operations performed within the lake house platform.

Data sourcing and ingestion

Data is sourced and loaded into the data lake through the use of AWS Glue jobs and Amazon AppFlow. The ingested data is made available in the Amazon Redshift data warehouse through Amazon Redshift Spectrum using external schemas and tables. The process of creating the external schemas and linking it to the Data Catalog is outlined later in this post.

Amazon AppFlow Salesforce ingestion

Amazon AppFlow is a fully-managed integration service that allows you to pull data from sources such as Salesforce, SAP, and Zendesk. Roche integrates with Salesforce to load Salesforce objects securely into their data lake without needing to write any custom code. Roche also pushes ML results back to Salesforce using Amazon AppFlow to facilitate the process.

Salesforce objects are first fully loaded into Amazon S3 and then are flipped to a daily incremental load to capture deltas. The data lands in the raw zone bucket in Parquet format using the date as a partition. The Amazon AppFlow flows are created through the use of a YAML configuration file (see the following code). This configuration is consumed by the AWS CDK deployment to create the corresponding flows.

appflow:
  flow_classes:
    salesforce:
      source: salesforce
      destination: s3
      incremental_load: 1
      schedule_expression: "rate(1 day)"
      s3_prefix: na
      connector_profile: roche-salesforce-connector-profile1,roche-salesforce-connector-profile2
      description: appflow flow flow from Salesforce
      environment: all
  - name: Account
    incremental_load: 1
    bookmark_col: appflow_date_str
  - name: CustomSalesforceObject
    pii: 0
    bookmark_col: appflow_date_str
    upsert_field_list: upsertField
    s3_prefix: prefix
    source: s3
    destination: salesforce
    schedule_expression: na
    connector_profile: roche-salesforce-connector-profile

The YAML configuration makes it easy to select whether data should be loaded from an S3 bucket back to Salesforce or from Salesforce to an S3 bucket. This configuration is subsequently read by the AWS CDK app and corresponding stacks to translate into Amazon AppFlow flows.

The following options are specified in the preceding YAML configuration file:

  • source – The location to pull data from (Amazon S3, Salesforce)
  • destination – The destination to put data to (Amazon S3, Salesforce)
  • object_name – The name of the Salesforce object to interact with
  • incremental_load – A Boolean specifying if the load should be incremental or full (0 means full, 1 means incremental)
  • schedule_expression – The cron or rate expression to run the flow (na makes it on demand)
  • s3_prefix – The prefix to push or pull the data from in the S3 bucket
  • connector_profile – The Amazon AppFlow connector profile name to use when connecting to Salesforce (can be a CSV list)
  • environment – The environment to deploy this Amazon AppFlow flow to (all means deploy to dev and prod, dev means development environment, prod means production environment)
  • upsert_field_list – The set of Salesforce object fields (can be a CSV list) to use when performing an upsert operation back to Salesforce (only applicable when loaded data back from an S3 bucket back to Salesforce)
  • bookmark_col – The name of the column to use in the Data Catalog for registering the daily load date string partition

Register Salesforce objects to the Data Catalog

Complete the following steps to register data loaded into the data lake with the Data Catalog and link it to Amazon Redshift:

  1. Gather Salesforce object fields and corresponding data types.
  2. Create a corresponding AWS Glue database in the Data Catalog.
  3. Run a query against Amazon Redshift to create an external schema that links to the AWS Glue database.
  4. Create tables and partitions in the AWS Glue database and tables.

Data is accessible via the Data Catalog and the Amazon Redshift cluster.

Amazon AppFlow dynamic field gathering

To construct the schema of the loaded Salesforce object in the data lake, you invoke the following Python function. The code utilizes an Amazon AppFlow client from Boto3 to dynamically gather the Salesforce object fields to construct the Salesforce object’s schema.

import boto3

client = boto3.client('appflow')

def get_salesforce_object_fields(object_name: str, connector_profile: str):
    """
    Gathers the Salesforce object and its corresponding fields.

    Parameters:
        salesforce_object_name (str) = the name of the Salesforce object to consume.
        appflow_connector_profile (str) = the name of AppFlow Connector Profile.

    Returns:
        object_schema_list (list) =  a list of the object's fields and datatype (a list of dictionaries).
    """
    print("Gathering Object Fields")

    object_fields = []

    response = client.describe_connector_entity(
        connectorProfileName=connector_profile,
        connectorEntityName=object_name,
        connectorType='Salesforce'
    )

    for obj in response['connectorEntityFields']:
        object_fields.append(
            {'field': obj['identifier'], 'data_type': obj['supportedFieldTypeDetails']['v1']['fieldType']})

    return object_fields

We use the function for both the creation of the Amazon AppFlow flow via the AWS CDK deployment and for creating the corresponding table in the Data Catalog in the appropriate AWS Glue database.

Create an Amazon CloudWatch Events rule, AWS Glue table, and partition

To add new tables (one per Salesforce object loaded into Amazon S3) and partitions into the Data Catalog automatically, you create an Amazon CloudWatch Events rule. This function enables you to query the data in both AWS Glue and Amazon Redshift.

After the Amazon AppFlow flow is complete, it invokes a CloudWatch Events rule and a corresponding Lambda function to either create a new table in AWS Glue or add a new partition with the corresponding date string for the current day. The CloudWatch Events rule looks like the following screenshot.

The invoked Lambda function uses the Amazon SageMaker Data Wrangler Python package to interact with the Data Catalog. Using the preceding function definition, the object fields and their data types are accessible to pass to the following function call:

import awswrangler as wr

def create_external_parquet_table(
    database_name: str, 
    table_name: str, 
    s3_path: str, 
    columns_map: dict, 
    partition_map: dict
):
    """
    Creates a new external table in Parquet format.

    Parameters:
        database_name (str) = the name of the database to create the table in.
        table_name (str) = the name of the table to create.
        s3_path (str) = the S3 path to the data set.
        columns_map (dict) = a dictionary object containing the details of the columns and their data types from appflow_utility.get_salesforce_object_fields
        partition_map (dict) = a map of the paritions for the parquet table as {'column_name': 'column_type'}
    
    Returns:
        table_metadata (dict) = metadata about the table that was created.
    """

    column_type_map = {}

    for field in columns_map:
        column_type_map[field['name']] = field['type']

    return wr.catalog.create_parquet_table(
        database=database_name,
        table=table_name,
        path=s3_path,
        columns_types=column_type_map,
        partitions_types=partition_map,
        description=f"AppFlow ingestion table for {table_name} object"
    )

If the table already exists, the Lambda function creates a new partition to account for the date in which the flow completed (if it doesn’t already exist):

import awswrangler as wr

def create_parquet_table_date_partition(
    database_name: str, 
    table_name: str, 
    s3_path: str, 
    year: str, 
    month: str, 
    day: str
):
    """
    Creates a new partition by the date (YYYY-MM-DD) on an existing parquet table.

    Parameters:
        database_name (str) = the name of the database to create the table in.
        table_name (str) = the name of the table to create.
        s3_path (str) = the S3 path to the data set.
        year(str) = the current year for the partition (YYYY format).
        month (str) = the current month for the partition (MM format).
        day (str) = the current day for the partition (DD format).
    
    Returns:
        table_metadata (dict) = metadata about the table that has a new partition
    """

    date_str = f"{year}{month}{day}"
    
    return wr.catalog.add_parquet_partitions(
        database=database_name,
        table=table_name,
        partitions_values={
            f"{s3_path}/{year}/{month}/{day}": [date_str]
        }
    )
    
def table_exists(
    database_name: str, 
    table_name: str
):
    """
    Checks if a table exists in the Glue catalog.

    Parameters:
        database_name (str) = the name of the Glue Database where the table should be.
        table_name (str) = the name of the table.
    
    Returns:
        exists (bool) = returns True if the table exists and False if it does not exist.
    """

    try:
        wr.catalog.table(database=database_name, table=table_name)
        return True
    except ClientError as e:
        return False

Amazon Redshift external schema query

An AWS Glue database is created for each Amazon AppFlow connector profile that is present in the preceding configuration. The objects that are loaded from Salesforce into Amazon S3 are registered as tables in the Data Catalog under the corresponding database. To link the database in the Data Catalog with an external Amazon Redshift schema, run the following query:

CREATE EXTERNAL SCHEMA ${connector_profile_name}_ext from data catalog
database '${appflow_connector_profile_name}'
iam_role 'arn:aws:iam::${AWS_ACCOUNT_ID}:role/RedshiftSpectrumRole'
region 'eu-west-1';

The specified iam_role value must be an IAM role created ahead of time and must have the appropriate access policies specified to query the Amazon S3 location.

Now, all the tables available in the Data Catalog can be queried using SQL locally in Amazon Redshift Spectrum.

Amazon AppFlow Salesforce destination

Roche trains and invokes ML models using data found in the Amazon Redshift data warehouse. After the ML models are complete, the results are pushed back into Salesforce. Through the use of Amazon AppFlow, we can achieve the data transfer without writing any custom code. The schema of the results must match the schema of the corresponding Salesforce object, and the format of the results must be written in either JSON lines or CSV format in order to be written back into Salesforce.

AWS Glue Jobs

To source on-premises data feeds into the data lake, Roche has built a set of AWS Glue jobs in Python. There are various external sources including databases and APIs that are directly loaded into the raw zone S3 bucket. The AWS Glue jobs are run on a daily basis to load new data. The data that is loaded follows the partitioning scheme of YYYYMMDD format in order to more efficiently store and query datasets. The loaded data is then converted into Parquet format for more efficient querying and storage purposes.

Amazon EKS and KubeFlow

To deploy ML models on Amazon EKS, Roche uses Kubeflow on Amazon EKS. The use of Amazon EKS as the backbone infrastructure makes it easy to build, train, test, and deploy ML models and interact with Amazon Redshift as a data source.

Firewall Manager

As an added layer of security, Roche takes extra precautions through the use of Firewall Manager. This allows Roche to explicitly deny or allow inbound and outbound traffic through the use of stateful and stateless rule sets. This also enables Roche to allow certain outbound access to external websites and deny websites that they don’t want resources inside of their Amazon VPC to have access to. This is critical especially when dealing with any sensitive datasets to ensure that data is secured and has no chance of being moved externally.

CI/CD

All the infrastructure outlined in the architecture diagram was automated and deployed to multiple AWS Regions using a continuous integration and continuous delivery (CI/CD) pipeline with GitLab Runners as the orchestrator. The GitFlow model was used for branching and invoking automated deployments to the Roche AWS accounts.

Infrastructure as code and AWS CDK

Infrastructure as code (IaC) best practices were used to facilitate the creation of all infrastructure. The Roche team uses the Python AWS CDK to deploy, version, and maintain any changes that occur to the infrastructure in their AWS account.

AWS CDK project structure

The top level of the project structure in GitLab includes the following folders (while not limited to just these folders) in order to keep infrastructure and code all in one location.

To facilitate the various resources that are created in the Roche account, the deployment was broken into the following AWS CDK apps, which encompass multiple stacks:

  • core
  • data_lake
  • data_warehouse

The core app contains all the stacks related to account setup and account bootstrapping, such as:

  • VPC creation
  • Initial IAM roles and policies
  • Security guardrails

The data_lake app contains all the stacks related to creating the AWS data lake, such as:

  • Lake Formation setup and registration
  • AWS Glue database creation
  • S3 bucket creation
  • Amazon AppFlow flow creation
  • AWS Glue job setup

The data_warehouse app contains all the stacks related to setting up the data warehouse infrastructure, such as:

  • Amazon Redshift cluster
  • Load balancer to Amazon Redshift cluster
  • Logging

The AWS CDK project structure described was chosen to keep the deployment flexible and to logically group together stacks that relied on each other. This flexibility allows for deployments to be broken out by function and deployed only when truly required and needed. This decoupling of different parts of the provisioning maintains flexibility when deploying.

AWS CDK project configuration

Project configurations are flexible and extrapolated away as YAML configuration files. For example, Roche has simplified the process of creating a new Amazon AppFlow flow and can add or remove flows as needed simply by adding a new entry into their YAML configuration. The next time the GitLab runner deployment occurs, it picks up the changes on AWS CDK synthesis to generate a new change set with the new set of resources. This configuration and setup keeps things dynamic and flexible while decoupling configuration from code.

Network architecture

The following diagram illustrates the network architecture.

We can break down the architecture into the following:

  • All AWS services are deployed in two Availability Zones (except Amazon Redshift)
  • Only private subnets have access to the on-premises Roche environment
  • Services are deployed in backend subnets
  • Perimeter protection using AWS Network Firewall
  • A network load balancer publishes services to the on premises environment

Network security configurations

Infrastructure, configuration, and security are defined as code in AWS CDK, and Roche uses a CI/CD pipeline to manage and deploy them. Roche has an AWS CDK application to deploy the core services of the project: VPC, VPN connectivity, and AWS security services (AWS Config, Amazon GuardDuty, and AWS Security Hub). The VPC contains four network layers deployed in two Availability Zones, and they have VPC endpoints to access AWS services like Amazon S3, Amazon DynamoDB, and Amazon Simple Queue Service (Amazon SQS). They limit internet access using AWS Network Firewall.

The infrastructure is defined as code and the configuration is segregated. Roche performed the VPC setup by running the CI/CD pipeline to deploy their infrastructure. The configuration is in a specific external file; if Roche wants to change any value of the VPC, they need to simply modify this file and run the pipeline again (without typing any new lines of code). If Roche wants to change any configurations, they don’t want to have to change any code. It makes it simple for Roche to make changes and simply roll them out to their environment, making the changes more transparent and easier to configure. Traceability of the configuration is more transparent and it makes it simpler for approving the changes.

The following code is an example of the VPC configuration:

"test": {
        "vpc": {
            "name": "",
            "cidr_range": "192.168.40.0/21",
            "internet_gateway": True,
            "flow_log_bucket": shared_resources.BUCKET_LOGGING,
            "flow_log_prefix": "vpc-flow-logs/",
        },
        "subnets": {
            "private_subnets": {
                "private": ["192.168.41.0/25", "192.168.41.128/25"],
                "backend": ["192.168.42.0/23", "192.168.44.0/23"],
            },
            "public_subnets": {
                "public": {
                    "nat_gateway": True,
                    "publics_ip": True,
                    "cidr_range": ["192.168.47.64/26", "192.168.47.128/26"],
                }
            },
            "firewall_subnets": {"firewall": ["192.168.47.0/28", "192.168.47.17/28"]},
        },
        ...
         "vpc_endpoints": {
            "subnet_group": "backend",
            "services": [
                "ec2",
                "ssm",
                "ssmmessages",
                "sns",
                "ec2messages",
                "glue",
                "athena",
                "secretsmanager",
                "ecr.dkr",
                "redshift-data",
                "logs",
                "sts",
            ],
            "gateways": ["dynamodb", "s3"],
            "subnet_groups_allowed": ["backend", "private"],
        },
        "route_53_resolvers": {
            "subnet": "private",
        ...

The advantages of this approach are as follows:

  • No need to modify the AWS CDK constructor and build new code to change VPC configuration
  • Central point to manage VPC configuration
  • Traceability of changes and history of the configuration through Git
  • Redeploy all the infrastructure in a matter of minutes in other Regions or accounts

Operations and alerting

Roche has developed an automated alerting system if any part of the end-to-end architecture encounters any issues, focusing on any issues when loading data from AWS Glue or Amazon AppFlow. All logging is published to CloudWatch by default for debugging purposes.

The operational alerts have been built for the following workflow:

  1. AWS Glue jobs and Amazon AppFlow flows ingest data.
  2. If a job fails, it emits an event to a CloudWatch Events rule.
  3. The rule is triggered and invokes an Lambda function to send failure details to an Amazon Simple Notification Service (Amazon SNS) topic.
  4. The SNS topic has a Lambda subscriber that gets invoked:
    1. The Lambda function reads out specific webhook URLs from AWS Secrets Manager.
    2. The function fires off an alert to the specific external systems.
  5. The external systems receive the message and the appropriate parties are notified of the issue with details.

The following architecture outlines the alerting mechanisms built for the lake house platform.

Conclusion

The GTM (Go-To-Market) domain has been successful in enabling their business stakeholders, data engineers and data scientists providing a platform that is extendable to many use-cases that Roche faces. It is a key enabler and an accelerator for the GTM organization in Roche. Through a modern data platform, Roche is now able to better understand customers and ultimately create and deliver valuable services that meet their needs. It extends beyond health care professionals (HCPs) to a larger healthcare ecosystem. The platform and infrastructure in this blog help to support and accelerate both internal and external stakeholders in their decision-making processes through actionable insights.

The steps in this post can help you plan to build a similar modern data strategy using AWS managed services to ingest data from sources like Salesforce, automatically create metadata catalogs and share data seamlessly between the data lake and data warehouse, and create alerts in the event of an orchestrated data workflow failure. In part 2 of this post, you learn about how the data warehouse was built using an agile data modeling pattern and how ELT jobs were quickly developed, orchestrated, and configured to perform automated data quality testing.

Special thanks go to the Roche team: Joao Antunes, Krzysztof Slowinski, Krzysztof Romanowski, Bartlomiej Zalewski, Wojciech Kostka, Patryk Szczesnowicz, Igor Tkaczyk, Kamil Piotrowski, Michalina Mastalerz, Jakub Lanski, Chun Wei Chan, Andrzej Dziabowski for their project delivery and support with this post.


About The Authors

Dr. Yannick Misteli, Roche – Dr. Yannick Misteli is leading cloud platform and ML engineering teams in global product strategy (GPS) at Roche. He is passionate about infrastructure and operationalizing data-driven solutions, and he has broad experience in driving business value creation through data analytics.

Simon Dimaline, AWS – Simon Dimaline has specialised in data warehousing and data modelling for more than 20 years. He currently works for the Data & Analytics team within AWS Professional Services, accelerating customers’ adoption of AWS analytics services.

Matt Noyce, AWS – Matt Noyce is a Senior Cloud Application Architect in Professional Services at Amazon Web Services. He works with customers to architect, design, automate, and build solutions on AWS for their business needs.

Chema Artal Banon, AWS – Chema Artal Banon is a Security Consultant at AWS Professional Services and he works with AWS’s customers to design, build, and optimize their security to drive business. He specializes in helping companies accelerate their journey to the AWS Cloud in the most secure manner possible by helping customers build the confidence and technical capability.

A special Thank You goes out to the following people whose expertise made this post possible from AWS:

  • Thiyagarajan Arumugam – Principal Analytics Specialist Solutions Architect
  • Taz Sayed – Analytics Tech Leader
  • Glenith Paletta – Enterprise Service Manager
  • Mike Murphy – Global Account Manager
  • Natacha Maheshe – Senior Product Marketing Manager
  • Derek Young – Senior Product Manager
  • Jamie Campbell – Amazon AppFlow Product Manager
  • Kamen Sharlandjiev – Senior Solutions Architect – Amazon AppFlow
  • Sunil Jethwani Principal Customer Delivery Architect
  • Vinay Shukla – Amazon Redshift Principal Product Manager
  • Nausheen Sayed – Program Manager

Extract, prepare, and analyze Salesforce.com data using Amazon AppFlow, AWS Glue DataBrew, and Amazon Athena

Post Syndicated from Ramkumar Nottath original https://aws.amazon.com/blogs/big-data/extract-prepare-and-analyze-salesforce-com-data-using-amazon-appflow-aws-glue-databrew-and-amazon-athena/

As organizations embark on their data modernization journey, big data analytics and machine learning (ML) use cases are becoming even more integral parts of business. The ease for data preparation and seamless integration with third-party data sources is of paramount importance in order to gain insights quickly and make critical business decisions faster.

AWS Glue DataBrew is a visual data preparation tool that cleans and normalizes data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

Software as a service (SaaS) applications play a pivotal role in organizations’ analytics pipelines. This data is essential to include when performing analytics to get insights to make better business decisions. Amazon AppFlow is a fully managed integration service that helps you transfer SaaS data to your data lake securely.

Recently, DataBrew announced native console integration with Amazon AppFlow to connect to data from applications like Salesforce, Zendesk, Slack, ServiceNow, and other SaaS applications, and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. With native integration with Amazon AppFlow, DataBrew is addressing both the challenges with data preparation and seamless integration with SaaS applications.

Salesforce is a popular and widely used customer relationship management (CRM) platform. It lets you store and manage prospect and customer information—like contact info, accounts, leads, and sales opportunities—in one central location. You can derive a lot of useful information by combining the prospect information stored in Salesforce with other structured and unstructured data in your data lake.

In this post, we walk you through how to extract data from Salesforce.com using the native integration that DataBrew has with Amazon AppFlow, prepare the data for your analytical use cases using DataBrew, store it in Amazon S3, and query it with Amazon Athena.

Architecture overview

The following diagram represents the flow described in this post. With the visual point-and-click interface in Amazon AppFlow, you create a new flow with Salesforce as source. You can either use an existing connection to Salesforce or create a new one. In DataBrew, while creating a dataset, you can choose the Amazon AppFlow flow as one of the sources to import the data for data preparation. After you perform the data preparation steps on sample data, you can save the steps as a recipe and automate the flow by creating a DataBrew job by selecting the dataset for the source and the newly created recipe for transformation. The transformed data is published to an S3 bucket. You can use an AWS Glue crawler to catalog that data and use Athena to query the data.

The workflow includes the following steps:

  1. Create an S3 bucket for the raw and transformed data.
  2. Create a connection to Salesforce.
  3. Create a flow to extract the data from Salesforce.com.
  4. Create a dataset and project.
  5. Prepare the data and create a recipe.
  6. Create a job to pull the data from the Amazon AppFlow flow, run the transformations, and load the data in Amazon S3.
  7. Create a crawler to catalog the data.
  8. Analyze data using Athena.

Prerequisites

To implement this solution, you need the following prerequisites:

Now that we have discussed the architecture of our solution, we present the step-by-step instructions.

Create an S3 bucket

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, databrew-appflow-data-prep-<your name>.
  3. Choose Create bucket.

Create a connection to Salesforce

If you already have a Salesforce connection created in Amazon AppFlow, you can skip this step. To create a new connection, complete the following steps:

  1. On the Amazon AppFlow console, choose Connections in the navigation pane.
  2. From the list of connectors, choose Salesforce.
  3. Choose Create connection.
  4. For Connection name, enter a name (for example, sfdc-appflow).
  5. Choose Continue.
  6. You’re redirected to a sign-in screen where you can log in to your Salesforce account. If you don’t have a Salesforce account, you can sign up for a developer account.
  7. Choose Allow to allow Amazon AppFlow to access your Salesforce account.

You can now see the new connection that was created.

Create a flow in Amazon AppFlow to extract data from Salesforce.com

To create a flow in Amazon AppFlow, complete the following steps:

  1. On the Amazon AppFlow console, choose Flows in the navigation pane.
  2. Choose Create flow.
  3. On the Specify flow details page, enter a name for the flow (for example, salesforce-data).
  4. Optionally, provide a description for the flow and tags.
  5. Choose Next.
  6. On the Configure flow page, for Source name¸ choose Salesforce.
  7. Choose the connection we created in the previous step.
  8. For Choose Salesforce object, choose the object you want to work with (for this post, we choose Opportunity).
  9. For Destination name, choose Amazon S3.
  10. For Bucket details, choose the bucket you created earlier.
  11. Optionally, provide a prefix (folder) where you want the data to land within the bucket.
  12. Under Additional settings, for Data transfer preference, select Aggregate all records.
  13. For Choose how to trigger the flow¸ select Run on demand.
  14. Choose Next.
  15. On the Source to destination field mapping page, for Source field name, select the fields you want to work with and choose Map fields directly.

Alternatively, you can choose Map all fields directly to map all the fields from the object.

  1. On the Add filters page, add any filters for the data you’re pulling.
  2. On the Review and create page, review all the details and choose Create flow.
  3. After the flow is created, choose Run flow to run the flow and verify whether the flow ran successfully.

Create a dataset and project in DataBrew

To create a dataset and DataBrew project, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter a name (for example, databrew-appflow-integration).
  4. For Select a dataset, select New dataset.
  5. For Dataset name, enter a name (for example, databrew-sfdc).
  6. From the list of sources, choose Amazon AppFlow.
  7. Select the flow you created.
  8. Under Permissions, for Role name, choose Create new IAM role.
  9. For New IAM role suffix, enter a suffix (for example, appflow-databrew).
  10. Choose Create project.

After you create the project, data is loaded to DataBrew so that you can perform data preparation activities.

Prepare data and create a recipe

With DataBrew, you can choose from over 250 pre-built transformations to automate data preparation tasks, all without the need to write any code. In this post, we only discuss a few of them. For the full list of transformations, see Recipe step and function reference.

In this step, we split the CloseDate column to CloseYear, CloseMonth, and CloseDay. Then we flag the outliers in the Amount column.

  1. Duplicate the column CloseDate by choosing the column and choosing Duplicate.
  2. For Duplicate column name, enter a name.
  3. Choose Apply.
  4. Select the column you created and on the Clean menu, choose Replace value or pattern.
  5. For Value to be replaced, select Enter custom value and enter -.
  6. Choose Apply.

This replaces – with empty values.

  1. Select the modified column and on the options menu, choose Split menu and At positions from beginning.
  2. For Position from the beginning¸ enter 4.
  3. Choose Apply.

This creates a new column with the year value.

Next, we split the column at position 2 from the beginning in CloseDate_copy_2 to extract month and day.

  1. On the options menu, choose Split column.
  2. Rename the CloseDate_copy_1 column to CloseYear and choose Apply.
  3. Repeat the steps to rename the other two newly created columns to CloseMonth and CloseDay.
  4. Select the Amount column and on the Outliers menu, choose Flag outliers.
  5. For Standard deviation threshold, enter 3.
  6. Under Outlier actions, select Flag outliers.
  7. Choose Apply.

You can see that an additional column got added and the outliers are flagged.

All the steps that we performed so far are recorded under Recipe.

  1. Under Recipe, choose Publish.
  2. For Version description, enter a description.
  3. Choose Publish.

This saves the recipe for future use.

Create a DataBrew job

To create a DataBrew job, complete the following steps:

  1. On the DataBrew console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. For Job name, enter a name (for example, databrew-appflow-job).
  4. For Select a dataset, choose the dataset we created (databrew-sfdc).
  5. For Select a recipe, choose the recipe we created (databrew-appflow-integration-recipe).
  6. In the Job output settings section, for Output to, choose Amazon S3.
  7. For S3 location, enter the S3 path for the data (for example, s3://databrew-appflow-data-prep-<your name>/processed/).
  8. For Role name, choose the role with suffix databrew-appflow-role.
  9. Choose Create and run job.

Create a crawler to catalog the data

To create your AWS Glue crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name¸ enter a name (for example, databrew-opportunity-data).
  4. Choose Next.
  5. Under Specify crawler source type, keep the default options and choose Next.
  6. Under Add a data store, for Include path, choose the S3 bucket that we used for the processed data (for example, s3://databrew-appflow-data-prep-<your name>/processed).
  7. Choose Next.
  8. For Add another data, select No.
  9. Choose Next.
  10. Select Create an IAM role and provide a suffix for the role (for example, databrew).
  11. For Frequency, choose Run on demand.
  12. On the next page, choose Add database.
  13. Enter a database name (for example, databrew-appflow).
  14. Choose Create.
  15. For Prefix, enter opportunity_.
  16. Choose Next.
  17. Review the details and choose Finish.
  18. After the crawler is created, select it and choose Run crawler.

The crawler catalogs the data that we uploaded to Amazon S3 after processing using DataBrew.

Analyze data using Athena

When the crawler is complete, we can analyze the data with Athena.

  1. On the AWS Glue console, choose the database we created.
  2. Under Data catalog, choose Tables in databrew-appflow.

You can see a table named opportunity_processed, which points to the Amazon S3 location where the processed data was landed.

  1. Select the table name.
  2. On the Action menu, choose View data.

A pop-up may appear to let you know that you’re going to the Athena console.

  1. Choose Preview data.

If this is the first time using Athena in this account, you have to set the query result location.

  1. Run a query in Athena.

You should be able to see the data in Amazon S3.

You can perform further analysis by running more queries on the data. The following query returns the expected revenue for based on various closeyear and closemonth combinations of opportunities:

SELECT closeyear, closemonth, SUM(expectedrevenue) FROM  "AwsDataCatalog"."databrew-appflow"."opportunity_processed" 
GROUP BY  closeyear, closemonth;

Clean up

You may want to clean up the demo environment when you are done. To do so, delete the following resources that were created as part of this post:

  • S3 bucket (databrew-appflow-data-prep-<your name>)
  • Connection to Salesforce in Amazon AppFlow (sfdc-appflow)
  • Flow in Amazon AppFlow (salesforce-data)
  • Dataset (databrew-sfdc), project (databrew-appflow-integration), and job (databrew-appflow-job) in DataBrew
  • AWS Glue Data Catalog database (databrew-appflow)
  • IAM role (AWSGlueDataBrewServiceRole-appflow-databrew)

Conclusion

In this post, we walked you through how to extract data from Salesforce.com using the native integration that DataBrew has with Amazon AppFlow. We also demonstrated how to prepare the data for analytical use cases using DataBrew, operationalize the data preparation steps by creating a recipe and use that in a DataBrew job, store the job result in Amazon S3, and query it with Athena.

If you have any questions or suggestions, please leave a comment.


About the Authors

Ramkumar Nottath is a Sr. Solutions Architect at AWS focusing on Analytics services. He enjoys working with various customers to help them build scalable, reliable big data and analytics solutions. His interests extend to various technologies such as analytics, data warehousing, streaming, and machine learning. He loves spending time with his family and friends.

Srikanth Sopirala is a Principal Analytics Specialist Solutions Architect at AWS. He is a seasoned leader with over 20 years of experience, who is passionate about helping customers build scalable data and analytics solutions to gain timely insights and make critical business decisions. In his spare time, he enjoys reading, spending time with his family, and road cycling.

Embed Amazon AppFlow in your applications using APIs

Post Syndicated from Jobin George original https://aws.amazon.com/blogs/big-data/embed-amazon-appflow-in-your-applications-using-apis/

Software as a service (SaaS) based applications are in demand today, and organizations have a growing need to adopt them in order to make data-driven decisions. As such SaaS adoption grows, extracting data from various SaaS applications and running analytics across them gets complicated. You have to rely on a set of third-party tools to extract the data from such SaaS providers and transform it to do analysis on it. The growing dependency on licensed third parties to provide data integration increases the overall cost of ownership and maintenance requirements in terms of skill and infrastructure.

In this post, you learn how to use Amazon AppFlow APIs to create data flows to extract data from SaaS providers in order to extend the connectivity of applications. This is particularly relevant for customer applications or individual software vendors (ISVs) who natively don’t have the connectivity capability.

Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS and cloud applications like Salesforce, Marketo, Slack, and ServiceNow, as well as AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift, in just a few clicks.

Solution overview

ISVs and customers can embed a data flow solution into their application using Amazon AppFlow APIs and ensure fast and secure data integration between SaaS solutions and supported targets for supporting end-user use cases.

Integrating Amazon AppFlow into your application has the following advantages:

  • Seamless experience for end-users without having to toggle between multiple user interfaces (UIs)
  • No need for building custom connectors to help end-users ingest data from popular SaaS providers
  • No need to rely on expensive third-party tools, and you pay only for what you use

In this post, I walk you through some of the common steps to create a connection to a SaaS provider, create a data flow, and run the flow with API calls. I show how each step equates to steps in the Amazon AppFlow UI and how that can be embedded into your application’s UI to provide a seamless experience to your end-users.

You can embed Amazon AppFlow inside your application through two different methods:

  • Create and run flows in Amazon AppFlow under the end-user’s AWS account and ingest data into targets owned by end-users
  • Create and run flows in Amazon AppFlow under the ISV’s AWS account and ingest data into targets owned by end-users

Let’s look at those two approaches in detail.

Create and run Amazon AppFlow data flows in the end-user’s AWS account

In this scenario, the partner or ISV hosts an application (which doesn’t nessesarily have to run out of an AWS account) with a UI and needs to support data integration with third-party SaaS providers to bring data for downstream processing. The ISV can use Amazon Appflow APIs to provide features within their product or application that enable end-users to create flows to bring data directly from SaaS providers such as Salesforce, Slack, Google Analytics, and more into targets supported by Amazon AppFlow for downstream processing.

For example, let’s assume an ISV wants to provide extract, transform, and load (ETL) and ELT capabilities with Amazon Redshift as a supported target. The end-user needs to bring data from Salesforce and ServiceNow for analytics in Amazon Redshift, but for this use case, let’s assume that the ISV doesn’t have the functionality to connect to Salesforce or ServiceNow. To quickly provide this functionality, rather than building custom connectors, the ISV can simply use the Amazon AppFlow API, incorporate it in their UI, and provide a seamless experience to run data flows in the end-user’s AWS account and ingest data from Salesforce and ServiceNow into Amazon Redshift for downstream processing.

The following diagram illustrates the architecture of this solution.

The architecture shows how the ISV’s application is running on the ISV’s AWS account, and Amazon AppFlow is running on the end-user’s AWS account, extracting data from various sources and ingesting data into various targets supported. The API doesn’t have to be called from another AWS account. Later in this post, we provide the authentication mechanism as an example and discuss it based on the assumption that the end-user-facing application runs on AWS.

In this approach, because Amazon AppFlow is running inside the end-user’s AWS account, you need an AWS Identity and Access Management (IAM) role that has permission to list, create, and run the flow and connectors, and cross-account access to the ISV’s AWS account so the ISV can assume that role and control Amazon AppFlow.

Create and run Amazon AppFlow data flows in the ISV’s AWS account

In this scenario, the partner or ISV hosts a application running on AWS with a UI in which they want to support data integration with third-party SaaS providers to bring data for downstream processing. The ISV can use Amazon Appflow APIs to provide functionality within their product and application that enables end-users to create flows to bring data directly from SaaS providers such as Salesforce, Slack, Google Analytics, and more into targets supported by Amazon AppFlow for downstream processing.

For example, let’s assume an ISV provides an analytics capability on data in Amazon Redshift. The end-user needs to bring data from Salesforce and ServiceNow for analytics into Amazon Redshift, but the ISV doesn’t have the functionality to connect to Salesforce or ServiceNow. To quickly provide this functionality, rather than building custom connectors to fulfill the customer requirement, the ISV can simply use the Amazon AppFlow API, incorporate it in their UI, and provide a seamless experience to run data flows in the ISV’s AWS account and ingest data from Salesforce and ServiceNow into the end-user’s Amazon Redshift cluster for downstream analytics processing.

The following diagram illustrates this architecture.

The architecture shows how the ISV’s application is running, as well as how the Amazon AppFlow data flows are run out of the ISV’s AWS account. The access to authenticate and save extracted data into customer-owned data destinations needs to be provided to the ISV in this case. This can be done by creating cross-account roles or similar mechanisms, except for Amazon S3. Amazon AppFlow doesn’t support cross-account access to S3 buckets, in order to prevent unauthorized access.

Create an Amazon AppFlow data flow using an API

Now let’s look at a few basic steps to create and run an Amazon AppFlow data flow using an API. We also discuss how to implement those steps within an application’s UI and what it looks like when configuring the flow on the Amazon AppFlow console directly. Let’s focus on the first design we discussed, in which the ISV runs Amazon AppFlow data flows in the end-user’s AWS account.

Set up a cross-account role for Amazon AppFlow access

Before we begin, if the ISV implementing this solution is using an AWS account to run the application, we recommend setting up cross-account roles to gain access to create and run Amazon AppFlow data flows. For more information, see IAM tutorial: Delegate access across AWS accounts using IAM roles.

In this scenario, the ISV assumes the role using AWS Security Token Service (AWS STS) to create resources and control the Amazon AppFlow lifecycle. You can use the following Python code to assume roles in another account after it’s created. This function just needs the role ARN to be passed; the role has necessary permissions to manage Amazon AppFlow.

def assumed_role_session(role_arn: str, base_session: botocore.session.Session = None):
    base_session = base_session or boto3.session.Session()._session
    fetcher = botocore.credentials.AssumeRoleCredentialFetcher(
        client_creator = base_session.create_client,
        source_credentials = base_session.get_credentials(),
        role_arn = role_arn
        extra_args = { }
    )
    creds = botocore.credentials.DeferredRefreshableCredentials(
        method = 'assume-role',
        refresh_using = fetcher.fetch_credentials,
        time_fetcher = lambda: datetime.datetime.now(tzlocal())
    )
    botocore_session = botocore.session.Session()
    botocore_session._credentials = creds
    return boto3.Session(botocore_session = botocore_session)

Confirm that Amazon AppFlow has permission to use Amazon S3 as the target

As a next step, let’s make sure Amazon AppFlow has the proper access to the S3 bucket used as the target. Your S3 buckets must be in the same AWS Region as your Amazon AppFlow data flow. Amazon AppFlow doesn’t support cross-account access to S3 buckets, in order to prevent unauthorized access.

Complete the following steps to add a bucket policy and make sure it’s attached to the S3 bucket that you’re using as a target. In the example code, we use the bucket name appflow-sfdata:

{
    "Version": "2008-10-17",
    "Statement": [ {
            "Effect": "Allow",
            "Principal": {
                "Service": "appflow.amazonaws.com"
            },
            "Action": [
                "s3:PutObject",
                "s3:AbortMultipartUpload",
                "s3:ListMultipartUploadParts",
                "s3:ListBucketMultipartUploads",
                "s3:GetBucketAcl",
                "s3:PutObjectAcl" ],
            "Resource": ["arn:aws:s3:::appflow-sfdata",
                "arn:aws:s3:::appflow-sfdata/*"
            ]}]
}

Provide a list of connector types available before creating a connection

After the cross-account role is assumed and the application has permission to create and manage Amazon AppFlow data flows, navigate to a page where you can list and select the type of connectors vended by Amazon AppFlow. You could also skip this step and create connections as part of the create flow step, which gives you the option to set up connections from the same page. But providing the selection option lets you manage connections effectively.

You can use the following code snippet to list the connectors available in Amazon AppFlow. Let’s assume that the IAM role we’re assuming belongs to user account 999999999999 and is named appflow-remote-role.

session = stsAssumeRole.assumed_role_session('arn:aws:iam::999999999999:role/appflow-remote-role')
client = session.client('appflow')
def describe_connector(event, context):
    response = client.describe_connectors( connectorTypes=[])
    return {'response': response['connectorConfigurations']}

The following screenshot shows the response to this step; you can display those options for users to choose from.

The following screenshot shows how this part is implemented on the Amazon AppFlow console.

Create an Amazon AppFlow connection

Now it’s time to create a connection in Amazon AppFlow using the API. After the connector types are listed, you can provide a page with text boxes that accept credentials required for that specific connector type.

Let’s see how to create a Salesforce connector. If you want to test it out, make sure you have created a global connected app in Salesforce. Make sure you have the consumer key and consumer secret for your application and the InstanceURL value for your Salesforce instance. We use that later in the configuration.

We recommend using Amazon AppFlow to manage and refresh the credentials you need to establish connectivity because after initial credentials are stored in AWS Secrets Manager by Amazon AppFlow, it automatically takes care of refreshing accessToken. For that, you have to implement the OAuth 2.0 web server flow by authenticating with code grant type. After it’s authenticated, store the clientId and clientSecret into Secrets Manager and pass its ARN as well as the accessToken and refreshToken values you received while creating the flow using the Amazon AppFlow API. After Amazon AppFlow has this information, it automatically takes care of refreshing the credentials and provides uninterrupted connectivity while your flow is running. See the following code snippet:

def create_connector ():
    response = client.create_connector_profile(
    connectorProfileName='SalesForceAPI',
    connectorType='Salesforce',
    connectionMode='Public',
    connectorProfileConfig={
        'connectorProfileProperties': {'Salesforce': {'instanceUrl': 'https://xx-dev-my.salesforce.com','isSandboxEnvironment': False}},
        'connectorProfileCredentials': {'Salesforce': {
            'clientCredsARN’: 'ARN of your Secrets Manger with ClientID and Secret'
            'accessToken': accessToken,
            'refreshToken': 'refreshToken' }}
    })

To make the customer experience better, this should be wrapped within a UI to capture the required input parameters for the connection being created. For more information about setting up permissions between services, see Connection instructions.

If you’re not using the Amazon AppFlow capability to refresh the credentials, make sure you have a logic to keep the accessToken refreshed to have uninterrupted connectivity to Salesforce while using this connection in a scheduled flow. Ideally, you should have a UI that accepts all these parameters from the customer, stores it in Secrets Manager, and passes it into the API code.

The following sample code grants type password and manages refreshing the connection’s accessToken before you run the flow each time. For more information about how it works, see OAuth 2.0 Username-Password Flow for Special Scenarios. In case of a scheduled flow, the Amazon AppFlow system triggers it, so you don’t have an opportunity to pass a new accessToken after the flow is up and running. However, you could invoke update_connector_profile() at regular intervals to make sure the flows can run without interruption by updating the connection’s accessToken. For more information about creating security tokens, client IDs, and secrets, see Getting the Security Token for Your Salesforce Account and Generating a Client ID and Client Secret Key for Salesforce Connections.

def token_gen(clientId,):
    SecretClient = boto3.client('secretsmanager', region_name='us-west-2')
    get_secret_value_response = SecretClient.get_secret_value(SecretId='AppFlowSalesforce')
    secret = get_secret_value_response['SecretString']
    secret = json.loads(secret)
    clientId = secret.get('clientId')
    clientSecret = secret.get('clientSecret')
    username = secret.get('SFLoginEmail')
    password = secret.get('SFPassword+SeurityToken')
    access_token_url = 'https://login.salesforce.com/services/oauth2/token'
    data = {'grant_type': 'password',
                'client_id' : clientId,
                'client_secret' : clientSecret,
                'username': SFLoginEmail,
                'password': password
            }
    headers = { 'content-type': 'application/x-www-form-urlencoded'}
    req = requests.post(access_token_url, data=data, headers=headers)
    response = req.json()
    accessToken = response['access_token']
    return accessToken

After you create the connection, Amazon AppFlow validates the connection details you provide. If an exception occurs, it returns the same and doesn’t create the connection. The Amazon AppFlow create_connector_profile() API sends a response back with the ARN for the connector profile just created.

Create an Amazon AppFlow data flow

You can create a connection as part of the create flow UI, similar to how the Amazon AppFlow UI does it, but for this post I walk you through each step. To create a flow, you may need to call multiple APIs to list the connections of a particular type, list the entities specific to that connector, and even describe the fields specific to an entity.

Let’s walk through how to do that, in order to provide a great experience for the end-users of your application. Our flow collects data from an account object in Salesforce (the fields Id, Name, and CreatedDate) and stores it in a specific S3 bucket. You can use the following Python code to call the create_flow() API in Amazon AppFlow:

def createSF_flow ():
    response = client.create_flow(
    flowName='Salesforce2s3',
    description='Pull Data From Salesforce to Amazon s3',
    triggerConfig={'triggerType': 'OnDemand'},
    sourceFlowConfig={
        'connectorType': 'Salesforce',
        'connectorProfileName': 'SalesForceAPI',
        'sourceConnectorProperties': {'Salesforce': {'object': 'Account'}}
    },
    destinationFlowConfigList=[
        {'connectorType': 'S3',
            'connectorProfileName': 'S3',
            'destinationConnectorProperties': {
                'S3': {
                    'bucketName': 'appflow-sfdata',
                    's3OutputFormatConfig': {'fileType':'JSON'}
                    ##'aggregationConfig': {'aggregationType': 'None'}}}}
                    }}}],
    tasks=[
        {'sourceFields': ['Id','Name','CreatedDate'],
            "taskProperties": {"DATA_TYPE":"string"},
            'destinationField': 'Id',
            'connectorOperator':{'Salesforce': 'PROJECTION'},
            'taskType': 'Filter',
        },
        {'sourceFields': ['CreatedDate'],
            'destinationField': 'CreatedDate',
            'connectorOperator':{'Salesforce': 'BETWEEN'},
            'taskType': 'Filter',
            "taskProperties": {"DATA_TYPE": "datetime","LOWER_BOUND": "1609506000000","UPPER_BOUND": "1612098000000"}
        },{'sourceFields': ['Id'],
            'destinationField': 'Id',
            "taskProperties": {"DATA_TYPE":"string"},
            'taskType': 'Map'
        },{'sourceFields': ['Name'],
            'destinationField': 'Name',
            "taskProperties": {"DATA_TYPE":"string"},
            'taskType': 'Map'
        },{'sourceFields': ['CreatedDate'],
            'destinationField': 'CreatedDate',
            "taskProperties": {"DATA_TYPE":"datetime"},
            'taskType': 'Map'
        }])
    return {
        'Flow ARN': response['flowArn'],
        "Flow Status": response['flowStatus']
         }

You receive the following response:

{
  "Flow ARN": "arn:aws:appflow:us-west-2: 999999999999:flow/Salesforce2s3",
  "Flow Status": "Active"
}

To create the flow, we need to give dynamic inputs.

For ease of creating flows, users should be able to list the connection type available for both source and target. The line of code from earlier has Salesforce as a source and Amazon S3 as a target hard-coded, but we should make it dynamic and easy for customers using other API methods.

describe_connectors() helps you describe the connectors vended by Amazon AppFlow for specified connector types. If you don’t specify a connector type, this operation describes all the connectors vended by Amazon AppFlow. Also, describe_connector_profiles()returns details of connector profiles created and available already. You can use these to list types of connectors similar to what Amazon AppFlow does in its UI (see the following screenshot).

In the preceding flow for the target, you can simply list S3 buckets that are available.

Start, stop, and delete an Amazon AppFlow data flow

After you create the Amazon AppFlow data flow, you can use the start_flow() API to start the flow. To do so, simply pass the flowName:

def startSFFlow (event, context):
    response = client.start_flow(flowName='Salesforce2s3')
    return {
        'Flow ARN': response['flowArn'],
        "Flow Status": response['flowStatus']
         }

With the preceding call, the Amazon AppFlow start_flow API runs the flow based on how you configured it—either to run on demand, on a schedule, or by an event. For scheduled and event-triggered flows, this operation activates the flow. In the preceding example flow, we made it on demand and therefore it runs each time you call start_flow.

To stop the flow, invoke stop_flow(), which deactivates the existing flow. For on-demand flows, like the one we created earlier, this operation returns an unsupportedOperationException error message. For scheduled and event-triggered flows, the following operation deactivates the flow:

def stopSFFlow (event, context):
    response = client.stop_flow(flowName='Salesforce2s3')
    return {
        'Flow ARN': response['flowArn'],
        "Flow Status": response['flowStatus'] }

You need to invoke delete_flow(), which enables your application to delete an existing flow. Before deleting the flow, Amazon AppFlow validates the request by checking the flow configuration and status. You can delete flows one at a time. See the following code:

def deleteSFFlow (event, context):
    response = client.stop_flow(flowName='Salesforce2s3')

You can also update your connection and flow with update_connector_profile() and update_flow().

With these APIs, you can use Amazon AppFlow to build delightful experiences for your customers by extending connectivity to multiple SaaS applications.

Conclusion

In this post, you learned about using Amazon AppFlow APIs to create an experience in your application to connect and extract data from SaaS providers by running it either in your AWS account or the end-user’s account, depending on how you want to implement the solution. This approach helps you no longer depend on third-party connector providers to extract data from SaaS providers, and therefore reduces the overall cost and complications of developing your application. The sample Python API code is available on GitHub. For additional details, refer to the Amazon AppFlow Boto3 API documentation or the Amazon AppFlow API Reference.


About the Author

Jobin George is a Big Data Solutions Architect with more than a decade of experience with designing and implementing large scale Big Data and Analytics solutions. He provides technical guidance, design advice and thought leadership to some of the key AWS customers and Big Data partners.

 

 

Vinay Kondapi is Head of Product for Amazon AppFlow. He specializes in application and data integration, scaling strategy for connectivity between SaaS products and AWS

Hydrate your data lake with SaaS application data using Amazon AppFlow

Post Syndicated from Ninad Phatak original https://aws.amazon.com/blogs/big-data/hydrate-your-data-lake-with-saas-application-data-using-amazon-appflow/

Organizations today want to make data-driven decisions. The data could lie in multiple source systems, such as line of business applications, log files, connected devices, social media, and many more. As organizations adopt software as a service (SaaS) applications, data becomes increasingly fragmented and trapped in different “data islands.” To make decision-making easier, organizations are building data lakes, which is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, ad hoc analytics, and machine learning (ML) to guide better decisions.

AWS provides services such as AWS Glue, AWS Lake Formation, Amazon Database Migration Service (AWS DMS), and many third-party solutions on AWS Marketplace to integrate data from various source systems into the Amazon Simple Storage Service (Amazon S3) data lake. If you’re using SaaS applications like Salesforce, Marketo, Slack, and ServiceNow to run your business, you may need to integrate data from these sources into your data lake. You likely also want to easily integrate these data sources without writing or managing any code. This is precisely where you can use Amazon AppFlow.

Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications like Salesforce, Marketo, Slack, and ServiceNow and AWS services like Amazon S3 and Amazon Redshift. With Amazon AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event in real time, or on demand. You can configure data transformations such as data masking and concatenation of fields as well as validate and filter data (omitting records that don’t fit a criteria) to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow automatically encrypts data in motion, and optionally allows you to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats. For a complete list of all the SaaS applications that can be integrated with Amazon AppFlow, see Amazon AppFlow integrations.

In this post, we look at how to integrate data from Salesforce into a data lake and query the data via Amazon Athena. Amazon AppFlow recently announced multiple new capabilities such as availability of APIs and integration with AWS CloudFormation. We take advantage of these new capabilities and deploy the solution using a CloudFormation template.

Solution architecture

The following diagram depicts the architecture of the solution that we deploy using AWS CloudFormation.

As seen in the diagram, we use Amazon AppFlow to integrate data from Salesforce into a data lake on Amazon S3. We then use Athena to query this data with the table definitions residing in the AWS Glue Data Catalog.

Deploy the solution with AWS CloudFormation

We use AWS CloudFormation to deploy the solution components in your AWS account. Choose an AWS Region for deployment where the following services are available:

  • Amazon AppFlow
  • AWS Glue
  • Amazon S3
  • Athena

You need to meet the following prerequisites before deploying the solution:

  • Have a Salesforce account with credentials authorized to pull data using APIs.
  • If you’re deploying the stack in an account using the Lake Formation permission model, validate the following settings:
    • The AWS Identity and Access Management (IAM) user used to deploy the stack is added as a data lake administrator under Lake Formation, or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions. The following screenshot shows the Data catalog settings page on the Lake Formation console, where you can set these permissions.

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Although you need these Lake Formation settings for the CloudFormation stack to deploy properly, in a production setting we recommend you use Lake Formation to govern access to the data in the data lake. For more information about Lake Formation, see What Is AWS Lake Formation?

We now deploy the solution and the following components:

  • An Amazon AppFlow flow to integrate Salesforce account data into Amazon S3
  • An AWS Glue Data Catalog database
  • An AWS Glue crawler to crawl the data pulled into Amazon S3 so that it can be queried using Athena.
  1. On the Amazon AppFlow console, on the Connections page, choose Create connection.
  2. For Connection name, enter a name for your connection.
  3. Choose Continue.

You’re redirected to the Salesforce login page, where you enter your Salesforce account credentials.

  1. Enter the appropriate credentials and grant OAuth2 access to the Amazon AppFlow client in the next step, after which a new connector profile is set up in your AWS account.
  2. To deploy the remaining solution components, choose Launch Stack:
  3. For Stack name, enter an appropriate name for the CloudFormation stack.
  4. For Parameters, enter the name of the Salesforce connection you created.
  5. Choose Next.
  6. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  7. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.
  9. Wait for the stack status to change to CREATE_COMPLETE.
  10. On the Outputs tab of the stack, record the name of the S3 bucket.

Run the flow

The CloudFormation stack has deployed a flow named SFDCAccount. Open the flow to see the configuration. The flow has been configured to do the following:

  • Pull the account object from your Salesforce account into a S3 bucket. The flow pulls certain attributes from the object in Parquet format.
  • Mask the last five digits of the phone number associated with the Salesforce account.
  • Build a validation on the Account ID field that ignores the record if the value is NULL.

Make sure that all these attributes pulled by the flow are part of your account object in Salesforce. Make any additional changes that you may want to the flow and save the flow.

  1. Run the flow by choosing Run flow.
  2. When the flow is complete, navigate to the S3 bucket created by the CloudFormation stack to confirm its contents.

The Salesforce account data is stored in Parquet format in the SFDCData/SFDCAccount/ folder in the S3 bucket.

  1. On the AWS Glue console, run the crawler AppFlowGlueCrawler.

This crawler has been created by the CloudFormation stack and is configured to crawl the S3 bucket and create a table in the appflowblogdb database in the Data Catalog.

When the crawler is complete, a table named SFDCAccount exists in the appflowblogdb database.

  1. On the Athena console, run the following query:
    Select * from appflowblogdb.SFDCAccount limit 10;

The output shows the data pulled by the Amazon AppFlow flow into the S3 bucket.

Clean up

When you’re done exploring the solution, complete the following steps to clean up the resources deployed by AWS CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack.
  2. Delete the CloudFormation stack.

Conclusion

In this post, we saw how you can easily set up an Amazon AppFlow flow to integrate data from Salesforce into your data lake. Amazon Appflow allows you to integrate data from many other SaaS applications into your data lake. After the data lands in Amazon S3, you can take it further for downstream processing using services like Amazon EMR and AWS Glue. You can then use the data in the data lake for multiple analytics use cases ranging from dashboards to ad hoc analytics and ML.


About the Authors

Ninad Phatak is a Principal Data Architect at Amazon Development Center India. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

Vinay Kondapi is Head of product for Amazon AppFlow. He specializes in Application and data integration with SaaS products at AWS.

 

 

 

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Building Salesforce integrations with Amazon EventBridge and Amazon AppFlow

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/building-salesforce-integrations-with-amazon-eventbridge/

This post is courtesy of Den Delimarsky, Senior Product Manager, and Vinay Kondapi, Senior Product Manager.

The integration between Amazon EventBridge and Amazon AppFlow enables customers to receive and react to events from Salesforce in their event-driven applications. In this blog post, I show you how to set up the integration, and route Salesforce events to an AWS Lambda function for processing.

Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between software as a service (SaaS) applications like Salesforce, Marketo, Slack, and ServiceNow, and AWS services like Amazon S3 and Amazon Redshift.

EventBridge SaaS integrations make it easier for customers to receive events from over 30 different SaaS providers. Salesforce is a popular SaaS provider among AWS customers, so it has been one of the most anticipated event sources for EventBridge. Customers want to build rich applications that can react to events that track campaigns, contracts, opportunities, and order changes.

The ability to receive these events allows you to build workflows where you can start a variety of processes. For example, you could notify a broad range of subscribers about the changes, or enrich the data with information from another service. Or you could route the event to an order delivery system.

Previously, to connect Salesforce to your application, you must write custom API polling code that routes events either directly to an application or to an event bus. With the Salesforce integration with EventBridge and Amazon AppFlow, the integration is built in minutes directly through the AWS Management Console, with no code required.

The solution outlined in this blog post is structured as follows:

Architecture overview

Setting up the event source

To set up the event source:

  1. Open the Amazon AppFlow console, and create a new flow. Choose Create flow button on the service landing page. Give your flow a unique name, and choose Next.Specify flow details
  2. In the Source name list, select Salesforce, and then choose Connect. Select the Salesforce environment you are using, and provide a unique connection name.Connect to Salesforce
  3. Choose Continue. When prompted, provide your Salesforce credentials. These are the credentials that are associated with the specific Salesforce environment selected in the previous step.
  4. Select Salesforce events from the list of available options for the flow, and choose the event that you want to route to EventBridge. This ensures that Amazon AppFlow can route specific events that are coming from Salesforce to an EventBridge event bus.Source details
  5. With the source set up, you can now specify the destination. In the Destination name list, select EventBridge.Destination name

To send Salesforce events to EventBridge, Amazon AppFlow creates a new partner event source that is associated with a partner event bus.

To create a partner event source:

  1. Select an existing partner event source, or create a new one by choosing the list of partner event sources.Destination details
  2. When creating a new event source, you can optionally customize the name, to make it easier for you to identify it later.Generate partner event source
  3. Choose an Amazon S3 bucket for large events. For events that are larger than 256 KB, Amazon AppFlow sends a URL for the S3 object to the event bus instead of the event payload.Large event handling
  4. Define a flow trigger, which determines when the flow is started. Because we are tracking events, we want to react to those as they come in. Using the default Run flow on event enables this scenario as changes occur in Salesforce.Flow trigger

With Amazon AppFlow, you can also configure data field mapping, validation rules, and filters. These enable you to enrich and modify event data before it is sent to the event bus.

Once you create the flow, you must activate the event source that you created. To complete this step:

  1. Open the EventBridge console.
  2. Associate a partner event source with an event bus by following the link in the Amazon AppFlow integration dialog box, or navigating directly to the partner event sources view. You can see a partner event source with a Pending state.Partner event source
  3. Select the event source and choose Associate with event bus.
  4. Confirm the settings and choose on Associate.Associate with an event bus
  5. Return to the Amazon AppFlow console, and open the flow you were creating. Choose Activate flow.Activate flow

Your integration is now complete, and EventBridge can start receiving Salesforce events from the configured flow.

Routing Salesforce events to Lambda function

The associated partner event bus receives all events of the configured type from the connected Salesforce accounts. Now your application can react to these events with the help of rules in EventBridge. Rules allow you to set conditions for event routing that determine what targets receive event payloads. You can learn more about this functionality in the EventBridge documentation.

To create a new rule:

  1. Go to the rules view in the EventBridge console, and choose Create rule.EventBridge Rules
  2. Provide a unique name and an optional description for your rule.
  3. Select the Event pattern option in the Define pattern section. With event pattern configuration, you can define parts of the event payload that EventBridge must look at to determine where to route the event.Define pattern
    For this exercise, start by capturing every Salesforce event that goes through the partner event bus. The only events routed through this bus are from the partner event source. In this case, it is Amazon AppFlow connected to Salesforce.
  4. Set the event matching pattern to Pre-defined pattern by service, with the service provider being All Events. The default setting allows you to receive all events that are coming through the partner event bus.Event matching pattern
  5. Select the event bus that the rule should be associated with. Choose Custom or partner event bus and select the event bus that you associated with the Amazon AppFlow event source. Every rule in EventBridge is associated with an event bus.Select event bus

When rules are triggered, the event can be routed to other AWS services. Additionally, every rule can have up to five different AWS targets. You can read more about available targets in the EventBridge documentation. For this blog post, we use an AWS Lambda function as a target for Salesforce events received from Amazon AppFlow.

To configure targets for your rule:

  1. From the list of targets, select Lambda function, and select an existing function. If you do not yet have a function available, you can create one in the AWS Lambda console.Select targets
  2. Choose Create. You have now completed the rule setup.

Now, Salesforce events that match the configured type are routed directly to a Lambda function in your account.

Testing the integration

To test the integration:

  1. Open the Lambda view in the AWS Management Console.
  2. Choose the function that is handling the events from EventBridge.
  3. In the Function code section, update the code to:
    exports.handler = async (event) => {
        console.log(event);
        const response = {
            statusCode: 200,
            body: JSON.stringify('Hello from Lambda!'),
        };
        return response;
    };
    

    Function code

  4. Choose Save.
  5. Open your Salesforce instance, and take an action that is associated with the event you configured earlier. For example, you could update a contract or create an order.
  6. Go back to your function in AWS Management Console, and choose the Monitoring tab.Lambda function monitoring tab
  7. Scroll to CloudWatch Logs Insights section.CloudWatch Logs Insights
  8. Choose the latest log stream. Make sure that the timestamp approximately matches the time when you triggered the action in Salesforce.
  9. Choose the log stream.
  10. Observe log events that contain Salesforce event data.

You have completed your first Salesforce integration with EventBridge and Amazon AppFlow. You are now able to build decoupled and highly scalable applications that integrate with Salesforce.

Conclusion

Building decoupled and scalable cross-service applications is more relevant than ever with requirements for high availability, consistency, and reliability. This blog post demonstrates a solution that connects Salesforce to an event-driven application that uses EventBridge and Amazon AppFlow to route events. The application uses events from Salesforce as a starting point for a custom processing workflow in a Lambda function.

To learn more about EventBridge, visit the EventBridge documentation or EventBridge Learning Path.

To learn more about Amazon AppFlow, visit the Amazon AppFlow documentation.