All posts by Rana Dutt

Publish and enrich real-time financial data feeds using Amazon MSK and Amazon Managed Service for Apache Flink

Post Syndicated from Rana Dutt original https://aws.amazon.com/blogs/big-data/publish-and-enrich-real-time-financial-data-feeds-using-amazon-msk-and-amazon-managed-service-for-apache-flink/

Financial data feeds are real-time streams of stock quotes, commodity prices, options trades, or other real-time financial data. Companies involved with capital markets such as hedge funds, investment banks, and brokerages use these feeds to inform investment decisions.

Financial data feed providers are increasingly being asked by their customers to deliver the feed directly to them through the AWS Cloud. That’s because their customers already have infrastructure on AWS to store and process the data and want to consume it with minimal effort and latency. In addition, the AWS Cloud’s cost-effectiveness enables even small and mid-size companies to become financial data providers. They can deliver and monetize data feeds that they have enriched with their own valuable information.

An enriched data feed can combine data from multiple sources, including financial news feeds, to add information such as stock splits, corporate mergers, volume alerts, and moving average crossovers to a basic feed.

In this post, we demonstrate how you can publish an enriched real-time data feed on AWS using Amazon Managed Streaming for Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. You can apply this architecture pattern to various use cases within the capital markets industry; we discuss some of those use cases in this post.

Apache Kafka is a high-throughput, low-latency distributed event streaming platform. Financial exchanges such as Nasdaq and NYSE are increasingly turning to Kafka to deliver their data feeds because of its exceptional capabilities in handling high-volume, high-velocity data streams.

Amazon MSK is a fully managed service that makes it easy for you to build and run applications on AWS that use Kafka to process streaming data.

Apache Flink is an opensource distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing, event time semantics, checkpointing, snapshots and rollback. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink is a fully managed, serverless experience in running Apache Flink applications. Customers can easily build real time Flink applications using any of Flink’s languages and APIs.

In this post, we use a real-time stock quotes feed from financial data provider Alpaca and add an indicator when the price moves above or below a certain threshold. The code provided in the GitHub repo allows you to deploy the solution to your AWS account. This solution was built by AWS Partner NETSOL Technologies.

Solution overview

In this solution, we deploy an Apache Flink application that enriches the raw data feed, an MSK cluster that contains the messages streams for both the raw and enriched feeds, and an Amazon OpenSearch Service cluster that acts as a persistent data store for querying the data. In a separate virtual private cloud (VPC) that acts as the customer’s VPC, we also deploy an Amazon EC2 instance running a Kafka client that consumes the enriched data feed. The following diagram illustrates this architecture.

Solution Architecture
Figure 1 – Solution architecture

The following is a step-by-step breakdown of the solution:

  1. The EC2 instance in your VPC is running a Python application that fetches stock quotes from your data provider through an API. In this case, we use Alpaca’s API.
  2. The application sends these quotes using Kafka client library to your kafka topic on MSK cluster. The kafka topic stores the raw quotes.
  3. The Apache Flink application takes the Kafka message stream and enriches it by adding an indicator whenever the stock price rises or declines 5% or more from the previous business day’s closing price.
  4. The Apache Flink application then sends the enriched data to a separate Kafka topic on your MSK cluster.
  5. The Apache Flink application also sends the enriched data stream to Amazon OpenSearch using a Flink connector for OpenSearch. Amazon Opensearch stores the data, and OpenSearch Dashboards allows applications to query the data at any point in the future.
  6. Your customer is running a Kafka consumer application on an EC2 instance in a separate VPC in their own AWS account. This application uses AWS PrivateLink to consume the enriched data feed securely, in real time.
  7. All Kafka user names and passwords are encrypted and stored in AWS Secrets Manager. The SASL/SCRAM authentication protocol used here makes sure all data to and from the MSK cluster is encrypted in transit. Amazon MSK encrypts all data at rest in the MSK cluster by default.

The deployment process consists of the following high-level steps:

  1. Launch the Amazon MSK cluster, Apache Flink application, Amazon OpenSearch Service domain, and Kafka producer EC2 instance in the producer AWS account. This step usually completes within 45 minutes.
  2. Set up multi-VPC connectivity and SASL/SCRAM authentication for the MSK cluster. This step can take up to 30 minutes.
  3. Launch the VPC and Kafka consumer EC2 instance in the consumer account. This step takes about 10 minutes.

Prerequisites

To deploy this solution, complete the following prerequisite steps:

  1. Create an AWS account if you don’t already have one and log in. We refer to this as the producer account.
  2. Create an AWS Identity and Access Management (IAM) user with full admin permissions. For instructions, refer to Create an IAM user.
  3. Sign out and sign back in to the AWS Management Console as this IAM admin user.
  4. Create an EC2 key pair named my-ec2-keypair in the producer account. If you already have an EC2 key pair, you can skip this step.
  5. Follow the instructions in ALPACA_README to sign up for a free Basic account at Alpaca to get your Alpaca API key and secret key. Alpaca will provide the real-time stock quotes for our input data feed.
  6. Install the AWS Command Line Interface (AWS CLI) on your local development machine and create a profile for the admin user. For instructions, see Set up the AWS Command Line Interface (AWS CLI).
  7. Install the latest version of the AWS Cloud Development Kit (AWS CDK) globally:
 npm install -g aws-cdk@latest

Deploy the Amazon MSK cluster

These steps create a new provider VPC and launch the Amazon MSK cluster there. You also deploy the Apache Flink application and launch a new EC2 instance to run the application that fetches the raw stock quotes.

  1. On your development machine, clone the GitHub repo and install the Python packages:
    git clone https://github.com/aws-samples/msk-powered-financial-data-feed.git
    cd msk-powered-financial-data-feed
    pip install -r requirements.txt

  2. Set the following environment variables to specify your producer AWS account number and AWS Region:
    export CDK_DEFAULT_ACCOUNT={your_AWS_account_no}
    export CDK_DEFAULT_REGION=us-east-1

  3. Run the following commands to create your config.py file:
    echo "mskCrossAccountId = <Your producer AWS account ID>" > config.py
    echo "producerEc2KeyPairName = '' " >> config.py
    echo "consumerEc2KeyPairName = '' " >> config.py
    echo "mskConsumerPwdParamStoreValue= '' " >> config.py
    echo "mskClusterArn = '' " >> config.py

  4. Run the following commands to create your alpaca.conf file:
    echo [alpaca] > dataFeedMsk/alpaca.conf
    echo ALPACA_API_KEY=your_api_key >> dataFeedMsk/alpaca.conf
    echo ALPACA_SECRET_KEY=your_secret_key >> dataFeedMsk/alpaca.conf

  5. Edit the alpaca.conf file and replace your_api_key and your_secret_key with your Alpaca API key.
  6. Bootstrap the environment for the producer account:
    cdk bootstrap aws://{your_AWS_account_no}/{your_aws_region}

  7. Using your editor or integrated development environment (IDE), edit the config.py file:
    1. Update the mskCrossAccountId parameter with your AWS producer account number.
    2. If you have an existing EC2 key pair, update the producerEc2KeyPairName parameter with the name of your key pair.
  8. View the dataFeedMsk/parameters.py file:
    1. If you are deploying in a Region other than us-east-1, update the Availability Zone IDs az1 and az2 accordingly. For example, the Availability Zones for us-west-2 would us-west-2a and us-west-2b.
    2. Make sure that the enableSaslScramClientAuth, enableClusterConfig, and enableClusterPolicy parameters in the parameters.py file are set to False.
  9. Make sure you are in the directory where the app1.py file is located. Then deploy as follows:
    cdk deploy --all --app "python app1.py" --profile {your_profile_name}

  10. Check that you now have an Amazon Simple Storage Service (Amazon S3) bucket whose name starts with awsblog-dev-artifacts containing a folder with some Python scripts and the Apache Flink application JAR file.

Deploy multi-VPC connectivity and SASL/SCRAM

Complete the following steps to deploy multi-VPC connectivity and SASL/SCRAM authentication for the MSK cluster:

  1. Set the enableSaslScramClientAuth, enableClusterConfig, and enableClusterPolicy parameters in the config.py file to True.
  2. Make sure you’re in the directory where the config.py file is located and deploy the multi-VPC connectivity and SASL/SCRAM authentication for the MSK cluster:

cdk deploy --all --app "python app1.py" --profile {your_profile_name}

This step can take up to 30 minutes.

  1. To check the results, navigate to your MSK cluster on the Amazon MSK console, and choose the Properties

You should see PrivateLink turned on, and SASL/SCRAM as the authentication type.

BDB-3696-multiVPC

  1. Copy the MSK cluster ARN.
  2. Edit your config.py file and enter the ARN as the value for the mskClusterArn parameter, then save the updated file.

Deploy the data feed consumer

Complete the steps in this section to create an EC2 instance in a new consumer account to run the Kafka consumer application. The application will connect to the MSK cluster through PrivateLink and SASL/SCRAM.

  1. Navigate to Parameter Store, a capability of AWS Systems Manager, in your producer account.
  2. Copy the value of the blogAws-dev-mskConsumerPwd-ssmParamStore parameter and update the mskConsumerPwdParamStoreValue parameter in the config.py file.
  3. Check the value of the parameter named blogAws-dev-getAzIdsParamStore and make a note of these two values.
  4. Create another AWS account for the Kafka consumer if you don’t already have one, and log in.
  5. Create an IAM user with admin permissions.
  6. Log out and log back in to the console using this IAM admin user.
  7. Make sure you are in the same Region as the Region you used in the producer account. Then create a new EC2 key pair named, for example, my-ec2-consumer-keypair, in this consumer account.
  8. Update the value of consumerEc2KeyPairName in your config.py file with the name of the key pair you just created.
  9. Open the AWS Resource Access Manager (AWS RAM) console in your consumer account.
  10. Compare the Availability Zone IDs from the Systems Manager parameter store with the Availability Zone IDs shown on the AWS RAM console.
  11. Identify the corresponding Availability Zone names for the matching Availability Zone IDs.
  12. Open the parameters.py file in the dataFeedMsk folder and insert these Availability Zone names into the variables crossAccountAz1 and crossAccountAz2. For example, in Parameter Store, if the values are “use1-az4” and “use1-az6”, then, when you switch to the consumer account’s AWS RAM console and compare, you may find that these values correspond to the Availability Zone names “us-east-1a” and “us-east-1b”. In that case, you need to update the parameters.py file with these Availability Zone names by setting crossAccountAz1 to “us-east-1a” and crossAccountAz2 to “us-east-1b”.
  13. Set the following environment variables, specifying your consumer AWS account ID:
export CDK_DEFAULT_ACCOUNT={your_aws_account_id}
export CDK_DEFAULT_REGION=us-east-1
  1. Bootstrap the consumer account environment. You need to add specific policies to the AWS CDK role in this case.
    cdk bootstrap aws://{your_aws_account_id}/{your_aws_region} --cloudformation-execution-policies "arn:aws:iam::aws:policy/AmazonMSKFullAccess,arn:aws:iam::aws:policy/AdministratorAccess" –-profile <your-user-profile>

You now need to grant the consumer account access to the MSK cluster.

  1. On the console, copy the consumer AWS account number to your clipboard.
  2. Sign out and sign back in to your producer AWS account.
  3. On the Amazon MSK console, navigate to your MSK cluster.
  4. Choose Properties and scroll down to Security settings.
  5. Choose Edit cluster policy and add the consumer account root to the Principal section as follows, then save the changes:
    "Principal": {
        "AWS": ["arn:aws:iam::<producer-acct-no>:root", "arn:aws:iam::<consumer-acct-no>:root"]
    },
    

  6. Create the IAM role that needs to be attached to the EC2 consumer instance:
    aws iam create-role --role-name awsblog-dev-app-consumerEc2Role --assume-role-policy-document file://dataFeedMsk/ec2ConsumerPolicy.json --profile <your-user-profile>

  7. Deploy the consumer account infrastructure, including the VPC, consumer EC2 instance, security groups, and connectivity to the MSK cluster:
    cdk deploy --all --app "python app2.py" --profile {your_profile_name}

Run the applications and view the data

Now that we have the infrastructure up, we can produce a raw stock quotes feed from the producer EC2 instance to the MSK cluster, enrich it using the Apache Flink application, and consume the enriched feed from the consumer application through PrivateLink. For this post, we use the Flink DataStream Java API for the stock data feed processing and enrichment. We also use Flink aggregations and windowing capabilities to identify insights in a certain time window.

Run the managed Flink application

Complete the following steps to run the managed Flink application:

  1. In your producer account, open the Amazon Managed Service for Apache Flink console and navigate to your application.
  2. To run the application, choose Run, select Run with latest snapshot, and choose Run.
    BDB-3696-FlinkJobRun
  3. When the application changes to the Running state, choose Open Apache Flink dashboard.

You should see your application under Running Jobs.

BDB-3696-FlinkDashboard

Run the Kafka producer application

Complete the following steps to run the Kafka producer application:

  1. On the Amazon EC2 console, locate the IP address of the producer EC2 instance named awsblog-dev-app-kafkaProducerEC2Instance.
  2. Connect to the instance using SSH and run the following commands:
    sudo su
    cd environment
    source alpaca-script/bin/activate
    python3 ec2-script-live.py AMZN NVDA

You need to start the script during market open hours. This will run the script that creates a connection to the Alpaca API. You should see lines of output showing that it is making the connection and subscribing to the given ticker symbols.

View the enriched data feed in OpenSearch Dashboards

Complete the following steps to create an index pattern to view the enriched data in your OpenSearch dashboard:

  1. To find the master user name for OpenSearch, open the config.py file and locate the value assigned to the openSearchMasterUsername parameter.
  2. Open Secrets Manager and click on awsblog-dev-app-openSearchSecrets secret to retrieve the password for OpenSearch.
  3. Navigate to your OpenSearch console and find the URL to your OpenSearch dashboard by clicking on the domain name for your OpenSearch cluster. Click on the URL and sign in using your master user name and password.
  4. In the OpenSearch navigation bar on the left, select Dashboards Management under the Management section.
  5. Choose Index patterns, then choose Create index pattern.
  6. Enter amzn* in the Index pattern name field to match the AMZN ticker, then choose Next step.
    BDB-3696-Opensearch
  7. Select timestamp under Time field and choose Create index pattern.
  8. Choose Discover in the OpenSearch Dashboards navigation pane.
  9. With amzn selected on the index pattern dropdown, select the fields to view the enriched quotes data.

The indicator field has been added to the raw data by Amazon Managed Service for Apache Flink to indicate whether the current price direction is neutral, bullish, or bearish.

Run the Kafka consumer application

To run the consumer application to consume the data feed, you first need to get the multi-VPC brokers URL for the MSK cluster in the producer account.

  1. On the Amazon MSK console, navigate to your MSK cluster and choose View client information.
  2. Copy the value of the Private endpoint (multi-VPC).
  3. SSH to your consumer EC2 instance and run the following commands:
    sudo su
    alias kafka-consumer=/kafka_2.13-3.5.1/bin/kafka-console-consumer.sh
    kafka-consumer --bootstrap-server {$MULTI_VPC_BROKER_URL} --topic amznenhanced --from-beginning --consumer.config ./customer_sasl.properties
    

You should then see lines of output for the enriched data feed like the following:

{"symbol":"AMZN","close":194.64,"open":194.58,"low":194.58,"high":194.64,"volume":255.0,"timestamp":"2024-07-11 19:49:00","%change":-0.8784661217630548,"indicator":"Neutral"}
{"symbol":"AMZN","close":194.77,"open":194.615,"low":194.59,"high":194.78,"volume":1362.0,"timestamp":"2024-07-11 19:50:00","%change":-0.8122628778040887,"indicator":"Neutral"}
{"symbol":"AMZN","close":194.82,"open":194.79,"low":194.77,"high":194.82,"volume":1143.0,"timestamp":"2024-07-11 19:51:00","%change":-0.7868000916660381,"indicator":"Neutral"}

In the output above, no significant changes are happening to the stock prices, so the indicator shows “Neutral”. The Flink application determines the appropriate sentiment based on the stock price movement.

Additional financial services use cases

In this post, we demonstrated how to build a solution that enriches a raw stock quotes feed and identifies stock movement patterns using Amazon MSK and Amazon Managed Service for Apache Flink. Amazon Managed Service for Apache Flink offers various features such as snapshot, checkpointing, and a recently launched Rollback API. These features allow you to build resilient real-time streaming applications.

You can apply this approach to a variety of other use cases in the capital markets domain. In this section, we discuss other cases in which you can use the same architectural patterns.

Real-time data visualization

Using real-time feeds to create charts of stocks is the most common use case for real-time market data in the cloud. You can ingest raw stock prices from data providers or exchanges into an MSK topic and use Amazon Managed Service for Apache Flink to display the high price, low price, and volume over a period of time. This is known as aggregates and is the foundation for displaying candlestick bar graphs. You can also use Flink to determine stock price ranges over time.

BDB-3696-real-time-dv

Stock implied volatility

Implied volatility (IV) is a measure of the market’s expectation of how much a stock’s price is likely to fluctuate in the future. IV is forward-looking and derived from the current market price of an option. It is also used to price new options contracts and is sometimes referred to as the stock market’s fear gauge because it tends to spike higher during market stress or uncertainty. With Amazon Managed Service for Apache Flink, you can consume data from a securities feed that will provide current stock prices and combine this with an options feed that provides contract values and strike prices to calculate the implied volatility.

Technical indicator engine

Technical indicators are used to analyze stock price and volume behavior, provide trading signals, and identify market opportunities, which can help in the decision-making process of trading. Although implied volatility is a technical indicator, there are many other indicators. There can be simple indicators such as “Simple Moving Average” that represent a measure of trend in a specific stock price based on the average of price over a period of time. There are also more complex indicators such as Relative Strength Index (RSI) that measures the momentum of a stock’s price movement. RSI is a mathematical formula that uses the exponential moving average of upward movements and downward movements.

Market alert engine

Graphs and technical indicators aren’t the only tools that you can use to make investment decisions. Alternative data sources are important, such as ticker symbol changes, stock splits, dividend payments, and others. Investors also act on recent news about the company, its competitors, employees, and other potential company-related information. You can use the compute capacity provided by Amazon Managed Service for Apache Flink to ingest, filter, transform, and correlate the different data sources to the stock prices and create an alert engine that can recommend investment actions based on these alternate data sources. Examples can range from invoking an action if dividend prices increase or decrease to using generative artificial intelligence (AI) to summarize several correlated news items from different sources into a single alert about an event.

Market surveillance

Market surveillance is the monitoring and investigation of unfair or illegal trading practices in the stock markets to maintain fair and orderly markets. Both private companies and government agencies conduct market surveillance to uphold rules and protect investors.

You can use Amazon Managed Service for Apache Flink streaming analytics as a powerful surveillance tool. Streaming analytics can detect even subtle instances of market manipulation in real time. By integrating market data feeds with external data sources, such as company merger announcements, news feeds, and social media, streaming analytics can quickly identify potential attempts at market manipulation. This allows regulators to be alerted in real time, enabling them to take prompt action even before the manipulation can fully unfold.

Markets risk management

In fast-paced capital markets, end-of-day risk measurement is insufficient. Firms need real-time risk monitoring to stay competitive. Financial institutions can use Amazon Managed Service for Apache Flink to compute intraday value-at-risk (VaR) in real time. By ingesting market data and portfolio changes, Amazon Managed Service for Apache Flink provides a low-latency, high-performance solution for continuous VaR calculations.

This allows financial institutions to proactively manage risk by quickly identifying and mitigating intraday exposures, rather than reacting to past events. The ability to stream risk analytics empowers firms to optimize portfolios and stay resilient in volatile markets.

Clean up

It’s always a good practice to clean up all the resources you created as part of this post to avoid any additional cost. To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stacks from the consumer account.
  2. Delete the CloudFormation stacks from the provider account.

Conclusion

In this post, we showed you how to provide a real-time financial data feed that can be consumed by your customers using Amazon MSK and Amazon Managed Service for Apache Flink. We used Amazon Managed Service for Apache Flink to enrich a raw data feed and deliver it to Amazon OpenSearch. Using this solution as a template, you can aggregate multiple source feeds, use Flink to calculate in real time any technical indicator, display data and volatility, or create an alert engine. You can add value for your customers by inserting additional financial information within your feed in real time.

We hope you found this post helpful and encourage you to try out this solution to solve interesting financial industry challenges.


About the Authors

Rana Dutt is a Principal Solutions Architect at Amazon Web Services. He has a background in architecting scalable software platforms for financial services, healthcare, and telecom companies, and is passionate about helping customers build on AWS.

Amar Surjit is a Senior Solutions Architect at Amazon Web Services (AWS), where he specializes in data analytics and streaming services. He advises AWS customers on architectural best practices, helping them design reliable, secure, efficient, and cost-effective real-time analytics data systems. Amar works closely with customers to create innovative cloud-based solutions that address their unique business challenges and accelerate their transformation journeys.

Diego Soares is a Principal Solutions Architect at AWS with over 20 years of experience in the IT industry. He has a background in infrastructure, security, and networking. Prior to joining AWS in 2021, Diego worked for Cisco, supporting financial services customers for over 15 years. He works with large financial institutions to help them achieve their business goals with AWS. Diego is passionate about how technology solves business challenges and provides beneficial outcomes by developing complex solution architectures.

Using AWS AppSync and AWS Lake Formation to access a secure data lake through a GraphQL API

Post Syndicated from Rana Dutt original https://aws.amazon.com/blogs/big-data/using-aws-appsync-and-aws-lake-formation-to-access-a-secure-data-lake-through-a-graphql-api/

Data lakes have been gaining popularity for storing vast amounts of data from diverse sources in a scalable and cost-effective way. As the number of data consumers grows, data lake administrators often need to implement fine-grained access controls for different user profiles. They might need to restrict access to certain tables or columns depending on the type of user making the request. Also, businesses sometimes want to make data available to external applications but aren’t sure how to do so securely. To address these challenges, organizations can turn to GraphQL and AWS Lake Formation.

GraphQL provides a powerful, secure, and flexible way to query and retrieve data. AWS AppSync is a service for creating GraphQL APIs that can query multiple databases, microservices, and APIs from one unified GraphQL endpoint.

Data lake administrators can use Lake Formation to govern access to data lakes. Lake Formation offers fine-grained access controls for managing user and group permissions at the table, column, and cell level. It can therefore ensure data security and compliance. Additionally, this Lake Formation integrates with other AWS services, such as Amazon Athena, making it ideal for querying data lakes through APIs.

In this post, we demonstrate how to build an application that can extract data from a data lake through a GraphQL API and deliver the results to different types of users based on their specific data access privileges. The example application described in this post was built by AWS Partner NETSOL Technologies.

Solution overview

Our solution uses Amazon Simple Storage Service (Amazon S3) to store the data, AWS Glue Data Catalog to house the schema of the data, and Lake Formation to provide governance over the AWS Glue Data Catalog objects by implementing role-based access. We also use Amazon EventBridge to capture events in our data lake and launch downstream processes. The solution architecture is shown in the following diagram.

Appsync and LakeFormation Arch itecture diagram

Figure 1 – Solution architecture

The following is a step by step description of the solution:

  1. The data lake is created in an S3 bucket registered with Lake Formation. Whenever new data arrives, an EventBridge rule is invoked.
  2. The EventBridge rule runs an AWS Lambda function to start an AWS Glue crawler to discover new data and update any schema changes so that the latest data can be queried.
    Note: AWS Glue crawlers can also be launched directly from Amazon S3 events, as described in this blog post.
  3. AWS Amplify allows users to sign in using Amazon Cognito as an identity provider. Cognito authenticates the user’s credentials and returns access tokens.
  4. Authenticated users invoke an AWS AppSync GraphQL API through Amplify, fetching data from the data lake. A Lambda function is run to handle the request.
  5. The Lambda function retrieves the user details from Cognito and assumes the AWS Identity and Access Management (IAM) role associated with the requesting user’s Cognito user group.
  6. The Lambda function then runs an Athena query against the data lake tables and returns the results to AWS AppSync, which then returns the results to the user.

Prerequisites

To deploy this solution, you must first do the following:

git clone [email protected]:aws-samples/aws-appsync-with-lake-formation.git
cd aws-appsync-with-lake-formation

Prepare Lake Formation permissions

Sign in to the LakeFormation console and add yourself as an administrator. If you’re signing in to Lake Formation for the first time, you can do this by selecting Add myself on the Welcome to Lake Formation screen and choosing Get started as shown in Figure 2.

Figure 2 – Add yourself as the Lake Formation administrator

Otherwise, you can choose Administrative roles and tasks in the left navigation bar and choose Manage Administrators to add yourself. You should see your IAM username under Data lake administrators with Full access when done.

Select Data catalog settings in the left navigation bar and make sure the two IAM access control boxes are not selected, as shown in Figure 3. You want Lake Formation, not IAM, to control access to new databases.

Lake Formation data catalog settings

Figure 3 – Lake Formation data catalog settings

Deploy the solution

To create the solution in your AWS environment, launch the following AWS CloudFormation stack:  Launch Cloudformation Stack

The following resources will be launched through the CloudFormation template:

  • Amazon VPC and networking components (subnets, security groups, and NAT gateway)
  • IAM roles
  • Lake Formation encapsulating S3 bucket, AWS Glue crawler, and AWS Glue database
  • Lambda functions
  • Cognito user pool
  • AWS AppSync GraphQL API
  • EventBridge rules

After the required resources have been deployed from the CloudFormation stack, you must create two Lambda functions and upload the dataset to Amazon S3. Lake Formation will govern the data lake that is stored in the S3 bucket.

Create the Lambda functions

Whenever a new file is placed in the designated S3 bucket, an EventBridge rule is invoked, which launches a Lambda function to initiate the AWS Glue crawler. The crawler updates the AWS Glue Data Catalog to reflect any changes to the schema.

When the application makes a query for data through the GraphQL API, a request handler Lambda function is invoked to process the query and return the results.

To create these two Lambda functions, proceed as follows.

  1. Sign in to the Lambda console.
  2. Select the request handler Lambda function named dl-dev-crawlerLambdaFunction.
  3. Find the crawler Lambda function file in your lambdas/crawler-lambda folder in the git repo that you cloned to your local machine.
  4. Copy and paste the code in that file to the Code section of the dl-dev-crawlerLambdaFunction in your Lambda console. Then choose Deploy to deploy the function.
Copy and paste code into the Lambda function

Figure 4 – Copy and paste code into the Lambda function

  1. Repeat steps 2 through 4 for the request handler function named dl-dev-requestHandlerLambdaFunction using the code in lambdas/request-handler-lambda.

Create a layer for the request handler Lambda

You now must upload some additional library code needed by the request handler Lambda function.

  1. Select Layers in the left menu and choose Create layer.
  2. Enter a name such as appsync-lambda-layer.
  3. Download this package layer ZIP file to your local machine.
  4. Upload the ZIP file using the Upload button on the Create layer page.
  5. Choose Python 3.7 as the runtime for the layer.
  6. Choose Create.
  7. Select Functions on the left menu and select the dl-dev-requestHandler Lambda function.
  8. Scroll down to the Layers section and choose Add a layer.
  9. Select the Custom layers option and then select the layer you created above.
  10. Click Add.

Upload the data to Amazon S3

Navigate to the root directory of the cloned git repository and run the following commands to upload the sample dataset. Replace the bucket_name placeholder with the S3 bucket provisioned using the CloudFormation template. You can get the bucket name from the CloudFormation console by going to the Outputs tab with key datalakes3bucketName as shown in image below.

Figure 5 – S3 bucket name shown in CloudFormation Outputs tab

Figure 5 – S3 bucket name shown in CloudFormation Outputs tab

Enter the following commands in your project folder in your local machine to upload the dataset to the S3 bucket.

cd dataset
aws s3 cp . s3://bucket_name/ --recursive

Now let’s take a look at the deployed artifacts.

Data lake

The S3 bucket holds sample data for two entities: companies and their respective owners. The bucket is registered with Lake Formation, as shown in Figure 6. This enables Lake Formation to create and manage data catalogs and manage permissions on the data.

Figure 6 – Lake Formation console showing data lake location

Figure 6 – Lake Formation console showing data lake location

A database is created to hold the schema of data present in Amazon S3. An AWS Glue crawler is used to update any change in schema in the S3 bucket. This crawler is granted permission to CREATE, ALTER, and DROP tables in the database using Lake Formation.

Apply data lake access controls

Two IAM roles are created, dl-us-east-1-developer and dl-us-east-1-business-analyst, each assigned to a different Cognito user group. Each role is assigned different authorizations through Lake Formation. The Developer role gains access to every column in the data lake, while the Business Analyst role is only granted access to the non-personally identifiable information (PII) columns.

Lake Formation console data lake permissions assigned to group roles

Figure 7 –Lake Formation console data lake permissions assigned to group roles

GraphQL schema

The GraphQL API is viewable from the AWS AppSync console. The Companies type includes several attributes describing the owners of the companies.

Schema for GraphQL API

Figure 8 – Schema for GraphQL API

The data source for the GraphQL API is a Lambda function, which handles the requests.

– AWS AppSync data source mapped to Lambda function

Figure 9 – AWS AppSync data source mapped to Lambda function

Handling the GraphQL API requests

The GraphQL API request handler Lambda function retrieves the Cognito user pool ID from the environment variables. Using the boto3 library, you create a Cognito client and use the get_group method to obtain the IAM role associated to the Cognito user group.

You use a helper function in the Lambda function to obtain the role.

def get_cognito_group_role(group_name):
    response = cognito_idp_client.get_group(
            GroupName=group_name,
            UserPoolId=cognito_user_pool_id
        )
    print(response)
    role_arn = response.get('Group').get('RoleArn')
    return role_arn

Using the AWS Security Token Service (AWS STS) through a boto3 client, you can assume the IAM role and obtain the temporary credentials you need to run the Athena query.

def get_temp_creds(role_arn):
    response = sts_client.assume_role(
        RoleArn=role_arn,
        RoleSessionName='stsAssumeRoleAthenaQuery',
    )
    return response['Credentials']['AccessKeyId'],
response['Credentials']['SecretAccessKey'],  response['Credentials']['SessionToken']

We pass the temporary credentials as parameters when creating our Boto3 Amazon Athena client.

athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token)

The client and query are passed into our Athena query helper function which executes the query and returns a query id. With the query id, we are able to read the results from S3 and bundle it as a Python dictionary to be returned in the response.

def get_query_result(s3_client, output_location):
    bucket, object_key_path = get_bucket_and_path(output_location)
    response = s3_client.get_object(Bucket=bucket, Key=object_key_path)
    status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
    result = []
    if status == 200:
        print(f"Successful S3 get_object response. Status - {status}")
        df = pandas.read_csv(response.get("Body"))
        df = df.fillna('')
        result = df.to_dict('records')
        print(result)
    else:
        print(f"Unsuccessful S3 get_object response. Status - {status}")
    return result

Enabling client-side access to the data lake

On the client side, AWS Amplify is configured with an Amazon Cognito user pool for authentication. We’ll navigate to the Amazon Cognito console to view the user pool and groups that were created.

Figure 10 –Amazon Cognito User pools

Figure 10 –Amazon Cognito User pools

For our sample application we have two groups in our user pool:

  • dl-dev-businessAnalystUserGroup – Business analysts with limited permissions.
  • dl-dev-developerUserGroup – Developers with full permissions.

If you explore these groups, you’ll see an IAM role associated to each. This is the IAM role that is assigned to the user when they authenticate. Athena assumes this role when querying the data lake.

If you view the permissions for this IAM role, you’ll notice that it doesn’t include access controls below the table level. You need the additional layer of governance provided by Lake Formation to add fine-grained access control.

After the user is verified and authenticated by Cognito, Amplify uses access tokens to invoke the AWS AppSync GraphQL API and fetch the data. Based on the user’s group, a Lambda function assumes the corresponding Cognito user group role. Using the assumed role, an Athena query is run and the result returned to the user.

Create test users

Create two users, one for dev and one for business analyst, and add them to user groups.

  1. Navigate to Cognito and select the user pool, dl-dev-cognitoUserPool, that’s created.
  2. Choose Create user and provide the details to create a new business analyst user. The username can be biz-analyst. Leave the email address blank, and enter a password.
  3. Select the Users tab and select the user you just created.
  4. Add this user to the business analyst group by choosing the Add user to group button.
  5. Follow the same steps to create another user with the username developer and add the user to the developers group.

Test the solution

To test your solution, launch the React application on your local machine.

  1. In the cloned project directory, navigate to the react-app directory.
  2. Install the project dependencies.
npm install
  1. Install the Amplify CLI:
npm install -g @aws-amplify/cli
  1. Create a new file called .env by running the following commands. Then use a text editor to update the environment variable values in the file.
echo export REACT_APP_APPSYNC_URL=Your AppSync endpoint URL > .env
echo export REACT_APP_CLIENT_ID=Your Cognito app client ID >> .env
echo export REACT_APP_USER_POOL_ID=Your Cognito user pool ID >> .env

Use the Outputs tab of your CloudFormation console stack to get the required values from the keys as follows:

REACT_APP_APPSYNC_URL appsyncApiEndpoint
REACT_APP_CLIENT_ID cognitoUserPoolClientId
REACT_APP_USER_POOL_ID cognitoUserPoolId
  1. Add the preceding variables to your environment.
source .env
  1. Generate the code needed to interact with the API using Amplify CodeGen. In the Outputs tab of your Cloudformation console, find your AWS Appsync API ID next to the appsyncApiId key.
amplify add codegen --apiId <appsyncApiId>

Accept all the default options for the above command by pressing Enter at each prompt.

  1. Start the application.
npm start

You can confirm that the application is running by visiting http://localhost:3000 and signing in as the developer user you created earlier.

Now that you have the application running, let’s take a look at how each role is served from the companies endpoint.

First, sign is as the developer role, which has access to all the fields, and make the API request to the companies endpoint. Note which fields you have access to.

The results for developer role

Figure 11 –The results for developer role

Now, sign in as the business analyst user and make the request to the same endpoint and compare the included fields.

The results for Business Analyst role

Figure 12 –The results for Business Analyst role

The First Name and Last Name columns of the companies list is excluded in the business analyst view even though you made the request to the same endpoint. This demonstrates the power of using one unified GraphQL endpoint together with multiple Cognito user group IAM roles mapped to Lake Formation permissions to manage role-based access to your data.

Cleaning up

After you’re done testing the solution, clean up the following resources to avoid incurring future charges:

  1. Empty the S3 buckets created by the CloudFormation template.
  2. Delete the CloudFormation stack to remove the S3 buckets and other resources.

Conclusion

In this post, we showed you how to securely serve data in a data lake to authenticated users of a React application based on their role-based access privileges. To accomplish this, you used GraphQL APIs in AWS AppSync, fine-grained access controls from Lake Formation, and Cognito for authenticating users by group and mapping them to IAM roles. You also used Athena to query the data.

For related reading on this topic, see Visualizing big data with AWS AppSync, Amazon Athena, and AWS Amplify and Design a data mesh architecture using AWS Lake Formation and AWS Glue.

Will you implement this approach for serving data from your data lake? Let us know in the comments!


About the Authors

Rana Dutt is a Principal Solutions Architect at Amazon Web Services. He has a background in architecting scalable software platforms for financial services, healthcare, and telecom companies, and is passionate about helping customers build on AWS.

Ranjith Rayaprolu is a Senior Solutions Architect at AWS working with customers in the Pacific Northwest. He helps customers design and operate Well-Architected solutions in AWS that address their business problems and accelerate the adoption of AWS services. He focuses on AWS security and networking technologies to develop solutions in the cloud across different industry verticals. Ranjith lives in the Seattle area and loves outdoor activities.

Justin Leto is a Sr. Solutions Architect at Amazon Web Services with specialization in databases, big data analytics, and machine learning. His passion is helping customers achieve better cloud adoption. In his spare time, he enjoys offshore sailing and playing jazz piano. He lives in New York City with his wife and baby daughter.