Tag Archives: AWS Glue

Validate streaming data over Amazon MSK using schemas in cross-account AWS Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/validate-streaming-data-over-amazon-msk-using-schemas-in-cross-account-aws-glue-schema-registry/

Today’s businesses face an unprecedented growth in the volume of data. A growing portion of the data is generated in real time by IoT devices, websites, business applications, and various other sources. Businesses need to process and analyze this data as soon as it arrives to make business decisions in real time. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables building and running stream processing applications that use Apache Kafka to collect and process data in real time.

Stream processing applications using Apache Kafka don’t communicate with each other directly; they communicate via sending and receiving messages over Kafka topics. For stream processing applications to communicate efficiently and confidently, a message payload structure must be defined in terms of attributes and data types. This structure describes the schema applications use when sending and receiving messages. However, with a large number of producer and consumer applications, even a small change in schema (removing a field, adding a new field, or change in data type) may cause issues for downstream applications that are difficult to debug and fix.

Traditionally, teams have relied on change management processes (such as approvals and maintenance windows) or other informal mechanisms (documentation, emails, collaboration tools, and so on) to inform one another of data schema changes. However, these mechanisms don’t scale and are prone to mistakes. The AWS Glue Schema Registry allows you to centrally publish, discover, control, validate, and evolve schemas for stream processing applications. With the AWS Glue Schema Registry, you can manage and enforce schemas on data streaming applications using Apache Kafka, Amazon MSK, Amazon Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post demonstrates how Apache Kafka stream processing applications validate messages using an Apache Avro schema stored in the AWS Glue Schema registry residing in a central AWS account. We use the AWS Glue Schema Registry SerDe library and Avro SpecificRecord to validate messages in stream processing applications while sending and receiving messages from a Kafka topic on an Amazon MSK cluster. Although we use an Avro schema for this post, the same approach and concept applies to JSON schemas as well.

Use case

Let’s assume a fictitious rideshare company that offers unicorn rides. To draw actionable insights, they need to process a stream of unicorn ride request messages. They expect rides to be very popular and want to make sure their solution can scale. They’re also building a central data lake where all their streaming and operation data is stored for analysis. They’re customer obsessed, so they expect to add new fun features to future rides, like choosing the hair color of your unicorn, and will need to reflect these attributes in the ride request messages. To avoid issues in downstream applications due to future schema changes, they need a mechanism to validate messages with a schema hosted in a central schema registry. Having schemas in a central schema registry makes it easier for the application teams to publish, validate, evolve, and maintain schemas in a single place.

Solution overview

The company uses Amazon MSK to capture and distribute the unicorn ride request messages at scale. They define an Avro schema for unicorn ride requests because it provides rich data structures, supports direct mapping to JSON, as well as a compact, fast, and binary data format. Because the schema was agreed in advance, they decided to use Avro SpecificRecord.SpecificRecord is an interface from the Avro library that allows the use of an Avro record as a POJO. This is done by generating a Java class (or classes) from the schema, by using avro-maven-plugin. They use AWS Identity and Access Management (IAM) cross-account roles to allow producer and consumer applications from the other AWS account to safely and securely access schemas in the central Schema Registry account.

The AWS Glue Schema Registry is in Account B, whereas the MSK cluster and Kafka producer and consumer applications are in Account A. We use the following two IAM roles to enable cross-account access to the AWS Glue Schema Registry. Apache Kafka clients in Account A assume a role in Account B using an identity-based policy because the AWS Glue Schema Registry doesn’t support resource-based policies.

  • Account A IAM role – Allows producer and consumer applications to assume an IAM role in Account B.
  • Account B IAM role – Trusts all IAM principals from Account A and allows them to perform read actions on the AWS Glue Schema Registry in Account B. In a real use case scenario, IAM principals that can assume cross-account roles should be scoped more specifically.

The following architecture diagram illustrates the solution:

The solution works as follows:

  1. A Kafka producer running in Account A assumes the cross-account Schema Registry IAM role in Account B by calling the AWS Security Token Service (AWS STS) assumeRole API.
  2. The Kafka producer retrieves the unicorn ride request Avro schema version ID from the AWS Glue Schema Registry for the schema that’s embedded in the unicorn ride request POJO. Fetching the schema version ID is internally managed by the AWS Glue Schema Registry SerDe’s serializer. The serializer has to be configured as part of the Kafka producer configuration.
  3. If the schema exists in the AWS Glue Schema Registry, the serializer decorates the data record with the schema version ID and then serializes it before delivering it to the Kafka topic on the MSK cluster.
  4. The Kafka consumer running in Account A assumes the cross-account Schema Registry IAM role in Account B by calling the AWS STS assumeRole API.
  5. The Kafka consumer starts polling the Kafka topic on the MSK cluster for data records.
  6. The Kafka consumer retrieves the unicorn ride request Avro schema from the AWS Glue Schema Registry, matching the schema version ID that’s encoded in the unicorn ride request data record. Fetching the schema is internally managed by the AWS Glue Schema Registry SerDe’s deserializer. The deserializer has to be configured as part of the Kafka consumer configuration. If the schema exists in the AWS Glue Schema Registry, the deserializer deserializes the data record into the unicorn ride request POJO for the consumer to process it.

The AWS Glue Schema Registry SerDe library also supports optional compression configuration to save on data transfers. For more information about the Schema Registry, see How the Schema Registry works.

Unicorn ride request Avro schema

The following schema (UnicornRideRequest.avsc) defines a record representing a unicorn ride request, which contains ride request attributes along with the customer attributes and system-recommended unicorn attributes:

{
    "type": "record",
    "name": "UnicornRideRequest",
    "namespace": "demo.glue.schema.registry.avro",
    "fields": [
      {"name": "request_id", "type": "int", "doc": "customer request id"},
      {"name": "pickup_address","type": "string","doc": "customer pickup address"},
      {"name": "destination_address","type": "string","doc": "customer destination address"},
      {"name": "ride_fare","type": "float","doc": "ride fare amount (USD)"},
      {"name": "ride_duration","type": "int","doc": "ride duration in minutes"},
      {"name": "preferred_unicorn_color","type": {"type": "enum","name": "UnicornPreferredColor","symbols": ["WHITE","BLACK","RED","BLUE","GREY"]}, "default": "WHITE"},
      {
        "name": "recommended_unicorn",
        "type": {
          "type": "record",
          "name": "RecommendedUnicorn",
          "fields": [
            {"name": "unicorn_id","type": "int", "doc": "recommended unicorn id"},
            {"name": "color","type": {"type": "enum","name": "unicorn_color","symbols": ["WHITE","RED","BLUE"]}},
            {"name": "stars_rating", "type": ["null", "int"], "default": null, "doc": "unicorn star ratings based on customers feedback"}
          ]
        }
      },
      {
        "name": "customer",
        "type": {
          "type": "record",
          "name": "Customer",
          "fields": [
            {"name": "customer_account_no","type": "int", "doc": "customer account number"},
            {"name": "first_name","type": "string"},
            {"name": "middle_name","type": ["null","string"], "default": null},
            {"name": "last_name","type": "string"},
            {"name": "email_addresses","type": ["null", {"type":"array", "items":"string"}]},
            {"name": "customer_address","type": "string","doc": "customer address"},
            {"name": "mode_of_payment","type": {"type": "enum","name": "ModeOfPayment","symbols": ["CARD","CASH"]}, "default": "CARD"},
            {"name": "customer_rating", "type": ["null", "int"], "default": null}
          ]
        }
      }
    ]
  }

Prerequisites

To use this solution, you must have two AWS accounts:

  • Account A – For the MSK cluster, Kafka producer and consumer Amazon Elastic Compute Cloud (Amazon EC2) instances, and AWS Cloud9 environment
  • Account B – For the Schema Registry and schema

For this solution, we use Region us-east-1, but you can change this as per your requirements.

Next, we create the resources in each account using AWS CloudFormation templates.

Create resources in Account B

We create the following resources in Account B:

  • A schema registry
  • An Avro schema
  • An IAM role with the AWSGlueSchemaRegistryReadonlyAccess managed policy and an instance profile, which allows all Account A IAM principals to assume it
  • The UnicornRideRequest.avsc Avro schema shown earlier, which is used as a schema definition in the CloudFormation template

Make sure you have the appropriate permissions to create these resources.

  1. Log in to Account B.
  2. Launch the following CloudFormation stack.
  3. For Stack name, enter SchemaRegistryStack.
  4. For Schema Registry name, enter unicorn-ride-request-registry.
  5. For Avro Schema name, enter unicorn-ride-request-schema-avro.
  6. For the Kafka client’s AWS account ID, enter your Account A ID.
  7. For ExternalId, enter a unique random ID (for example, demo10A), which should be provided by the Kafka clients in Account A while assuming the IAM role in this account.

For more information about cross-account security, see The confused deputy problem.

  1. When the stack is complete, on the Outputs tab of the stack, copy the value for CrossAccountGlueSchemaRegistryRoleArn.

The Kafka producer and consumer applications created in Account A assume this role to access the Schema Registry and schema in Account B.

  1. To verify the resources were created, on the AWS Glue console, choose Schema registries in the navigation bar, and locate unicorn-ride-request-registry.
  2. Choose the registry unicorn-ride-request-registry and verify that it contains unicorn-ride-request-schema-avro in the Schemas section.
  3. Choose the schema to see its content.

The IAM role created by the SchemaRegistryStack stack allows all Account A IAM principals to assume it and perform read actions on the AWS Glue Schema Registry. Let’s look at the trust relationships of the IAM role.

  1. On the SchemaRegistryStack stack Outputs tab, copy the value for CrossAccountGlueSchemaRegistryRoleName.
  2. On the IAM console, search for this role.
  3. Choose Trust relationships and look at its trusted entities to confirm that Account A is listed.
  4. In the Conditions section, confirm that sts:ExternalId has the same unique random ID provided during stack creation.

Create resources in Account A

We create the following resources in Account A:

  • A VPC
  • EC2 instances for the Kafka producer and consumer
  • An AWS Cloud9 environment
  • An MSK cluster

As a prerequisite, create an EC2 keypair and download it on your machine to be able to SSH into EC2 instances. Also create an MSK cluster configuration with default values. You need to have permissions to create the CloudFormation stack, EC2 instances, AWS Cloud9 environment, MSK cluster, MSK cluster configuration, and IAM role.

  1. Log in to Account A.
  2. Launch the following CloudFormation stack to launch the VPC, EC2 instances, and AWS Cloud9 environment.
  3. For Stack name, enter MSKClientStack.
  4. Provide the VPC and subnet CIDR ranges.
  5. For EC2 Keypair, choose an existing EC2 keypair.
  6. For the latest EC2 AMI ID, select the default option.
  7. For the cross-account IAM role ARN, use the value for CrossAccountGlueSchemaRegistryRoleArn (available on the Outputs tab of SchemaRegistryStack).
  8. Wait for the stack to create successfully.
  9. Launch the following CloudFormation stack to create the MSK cluster.
  10. For Stack name, enter MSKClusterStack.
  11. Use Amazon MSK version 2.7.1.
  12. For the MSK cluster configuration ARN, enter the MSK cluster configuration ARN. One that you created as part of the prerequisite.
  13. For the MSK cluster configuration revision number, enter 1 or change it according to your version.
  14. For the client CloudFormation stack name, enter MSKClientStack (the stack name that you created prior to this stack).

Configure the Kafka producer

To configure the Kafka producer accessing the Schema Registry in the central AWS account, complete the following steps:

  1. Log in to Account A.
  2. On the AWS Cloud9 console, choose the Cloud9EC2Bastion environment created by the MSKClientStack stack.
  3. On the File menu, choose Upload Local Files.
  4. Upload the EC2 keypair file that you used earlier while creating the stack.
  5. Open a new terminal and change the EC2 keypair permissions:
    chmod 0400 <keypair PEM file>

  6. SSH into the KafkaProducerInstance EC2 instance and set the Region as per your requirement:
    ssh -i <keypair PEM file> [email protected]<KafkaProducerInstance Private IP address>
    aws configure set region <region>

  7. Set the environment variable MSK_CLUSTER_ARN pointing to the MSK cluster’s ARN:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters |  jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d \")

Change the .ClusterName value in the code if you used a different name for the MSK cluster CloudFormation stack. The cluster name is the same as the stack name.

  1. Set the environment variable BOOTSTRAP_BROKERS pointing to the bootstrap brokers:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verify the environment variables:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

  3. Create a Kafka topic called unicorn-ride-request-topic in your MSK cluster, which is used by the Kafka producer and consumer applications later:
    cd ~/kafka
    
    ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS \
    --topic unicorn-ride-request-topic \
    --create --partitions 3 --replication-factor 2
    
    ./bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_BROKERS --list

The MSKClientStack stack copied the Kafka producer client JAR file called kafka-cross-account-gsr-producer.jar to the KafkaProducerInstance instance. It contains the Kafka producer client that sends messages to the Kafka topic unicorn-ride-request-topic on the MSK cluster and accesses the unicorn-ride-request-schema-avro Avro schema from the unicorn-ride-request-registry schema registry in Account B. The Kafka producer code, which we cover later in this post, is available on GitHub.

  1. Run the following commands and verify kafka-cross-account-gsr-producer.jar exists:
    cd ~
    ls -ls

  2. Run the following command to run the Kafka producer in the KafkaProducerInstance terminal:
    java -jar kafka-cross-account-gsr-producer.jar -bs $BOOTSTRAP_BROKERS \
    -rn <Account B IAM role arn that Kafka producer application needs to assume> \
    -topic unicorn-ride-request-topic \
    -reg us-east-1 \
    -nm 500 \
    -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

The code has the following parameters:

  • -bs$BOOTSTRAP_BROKERS (the MSK cluster bootstrap brokers)
  • -rn – The CrossAccountGlueSchemaRegistryRoleArn value from the SchemaRegistryStack stack outputs in Account B
  • -topic – the Kafka topic unicorn-ride-request-topic
  • -regus-east-1 (change it according to your Region, it’s used for the AWS STS endpoint and Schema Registry)
  • -nm: 500 (the number of messages the producer application sends to the Kafka topic)
  • -externalId – The same external ID (for example, demo10A) that you used while creating the CloudFormation stack in Account B

The following screenshot shows the Kafka producer logs showing Schema Version Id received..., which means it has retrieved the Avro schema unicorn-ride-request-schema-avro from Account B and messages were sent to the Kafka topic on the MSK cluster in Account A.

Kafka producer code

The complete Kafka producer implementation is available on GitHub. In this section, we break down the code.

  • getProducerConfig() initializes the producer properties, as shown in the following code:
    • VALUE_SERIALIZER_CLASS_CONFIG – The GlueSchemaRegistryKafkaSerializer.class.getName() AWS serializer implementation that serializes data records (the implementation is available on GitHub)
    • REGISTRY_NAME – The Schema Registry from Account B
    • SCHEMA_NAME – The schema name from Account B
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getProducerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.CLIENT_ID_CONFIG,"msk-cross-account-gsr-producer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,regionName);
        props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "unicorn-ride-request-registry");
        props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "unicorn-ride-request-schema-avro");
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
        return props;
}
  • startProducer() assumes the role in Account B to be able to connect with the Schema Registry in Account B and sends messages to the Kafka topic on the MSK cluster:
public void startProducer() {
        assumeGlueSchemaRegistryRole();
        KafkaProducer<String, UnicornRideRequest> producer = 
		new KafkaProducer<String,UnicornRideRequest>(getProducerConfig());
        int numberOfMessages = Integer.valueOf(str_numOfMessages);
        logger.info("Starting to send records...");
        for(int i = 0;i < numberOfMessages;i ++)
        {
            UnicornRideRequest rideRequest = getRecord(i);
            String key = "key-" + i;
            ProducerRecord<String, UnicornRideRequest> record = 
		new ProducerRecord<String, UnicornRideRequest>(topic, key, rideRequest);
            producer.send(record, new ProducerCallback());
        }
 }
  • assumeGlueSchemaRegistryRole() as shown in the following code uses AWS STS to assume the cross-account Schema Registry IAM role in Account B. (For more information, see Temporary security credentials in IAM.) The response from stsClient.assumeRole(roleRequest) contains the temporary credentials, which include accessKeyId, secretAccessKey, and a sessionToken. It then sets the temporary credentials in the system properties. The AWS SDK for Java uses these credentials while accessing the Schema Registry (through the Schema Registry serializer). For more information, see Using Credentials.
    public void assumeGlueSchemaRegistryRole() {
            try {
    	   Region region = Region.of(regionName);
                if(!Region.regions().contains(region))
                     throw new RuntimeException("Region : " + regionName + " is invalid.");
                StsClient stsClient = StsClient.builder().region(region).build();
                AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                        .roleArn(this.assumeRoleARN)
                        .roleSessionName("kafka-producer-cross-account-glue-schemaregistry-demo")
    	           .externalId(this.externalId)	
                        .build();
                AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
                Credentials myCreds = roleResponse.credentials();
                System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
                System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
                System.setProperty("aws.sessionToken", myCreds.sessionToken());
                stsClient.close();
            } catch (StsException e) {
                logger.error(e.getMessage());
                System.exit(1);
            }
        }

  • createUnicornRideRequest() uses the Avro schema (unicorn ride request schema) generated classes to create a SpecificRecord. For this post, the unicorn ride request attributes values are hard-coded in this method. See the following code:
    public UnicornRideRequest getRecord(int requestId){
                /*
                 Initialise UnicornRideRequest object of
                 class that is generated from AVRO Schema
                 */
               UnicornRideRequest rideRequest = UnicornRideRequest.newBuilder()
                .setRequestId(requestId)
                .setPickupAddress("Melbourne, Victoria, Australia")
                .setDestinationAddress("Sydney, NSW, Aus")
                .setRideFare(1200.50F)
                .setRideDuration(120)
                .setPreferredUnicornColor(UnicornPreferredColor.WHITE)
                .setRecommendedUnicorn(RecommendedUnicorn.newBuilder()
                        .setUnicornId(requestId*2)
                        .setColor(unicorn_color.WHITE)
                        .setStarsRating(5).build())
                .setCustomer(Customer.newBuilder()
                        .setCustomerAccountNo(1001)
                        .setFirstName("Dummy")
                        .setLastName("User")
                        .setEmailAddresses(Arrays.asList("[email protected]"))
                        .setCustomerAddress("Flinders Street Station")
                        .setModeOfPayment(ModeOfPayment.CARD)
                        .setCustomerRating(5).build()).build();
                logger.info(rideRequest.toString());
                return rideRequest;
        }

Configure the Kafka consumer

The MSKClientStack stack created the KafkaConsumerInstance instance for the Kafka consumer application. You can view all the instances created by the stack on the Amazon EC2 console.

To configure the Kafka consumer accessing the Schema Registry in the central AWS account, complete the following steps:

  1. Open a new terminal in the Cloud9EC2Bastion AWS Cloud9 environment.
  2. SSH into the KafkaConsumerInstance EC2 instance and set the Region as per your requirement:
    ssh -i <keypair PEM file> [email protected]<KafkaConsumerInstance Private IP address>
    aws configure set region <region>

  3. Set the environment variable MSK_CLUSTER_ARN pointing to the MSK cluster’s ARN:
    export MSK_CLUSTER_ARN=$(aws kafka list-clusters |  jq '.ClusterInfoList[] | select (.ClusterName == "MSKClusterStack") | {ClusterArn} | join (" ")' | tr -d \")

Change the .ClusterName value if you used a different name for the MSK cluster CloudFormation stack. The cluster name is the same as the stack name.

  1. Set the environment variable BOOTSTRAP_BROKERS pointing to the bootstrap brokers:
    export BOOTSTRAP_BROKERS=$(aws kafka get-bootstrap-brokers --cluster-arn $MSK_CLUSTER_ARN | jq -r .BootstrapBrokerString)

  2. Verify the environment variables:
    echo $MSK_CLUSTER_ARN
    echo $BOOTSTRAP_BROKERS

The MSKClientStack stack copied the Kafka consumer client JAR file called kafka-cross-account-gsr-consumer.jar to the KafkaConsumerInstance instance. It contains the Kafka consumer client that reads messages from the Kafka topic unicorn-ride-request-topic on the MSK cluster and accesses the unicorn-ride-request-schema-avro Avro schema from the unicorn-ride-request-registry registry in Account B. The Kafka consumer code, which we cover later in this post, is available on GitHub.

  1. Run the following commands and verify kafka-cross-account-gsr-consumer.jar exists:
    cd ~
    ls -ls

  2. Run the following command to run the Kafka consumer in the KafkaConsumerInstance terminal:
    java -jar kafka-cross-account-gsr-consumer.jar -bs $BOOTSTRAP_BROKERS \
    -rn <Account B IAM role arn that Kafka consumer application needs to assume> \
    -topic unicorn-ride-request-topic \
    -reg us-east-1 \
    -externalid <Account B IAM role external Id that you used while creating a CF stack in Account B>

The code has the following parameters:

  • -bs$BOOTSTRAP_BROKERS (the MSK cluster bootstrap brokers)
  • -rn – The CrossAccountGlueSchemaRegistryRoleArn value from the SchemaRegistryStack stack outputs in Account B
  • -topic – The Kafka topic unicorn-ride-request-topic
  • -regus-east-1 (change it according to your Region, it’s used for the AWS STS endpoint and Schema Registry)
  • -externalId – The same external ID (for example, demo10A) that you used while creating the CloudFormation stack in Account B

The following screenshot shows the Kafka consumer logs successfully reading messages from the Kafka topic on the MSK cluster in Account A and accessing the Avro schema unicorn-ride-request-schema-avro from the unicorn-ride-request-registry schema registry in Account B.

If you see the similar logs, it means both the Kafka consumer applications have been able to connect successfully with the centralized Schema Registry in Account B and are able to validate messages while sending and consuming messages from the MSK cluster in Account A.

Kafka consumer code

The complete Kafka consumer implementation is available on GitHub. In this section, we break down the code.

  • getConsumerConfig() initializes consumer properties, as shown in the following code:
    • VALUE_DESERIALIZER_CLASS_CONFIG – The GlueSchemaRegistryKafkaDeserializer.class.getName() AWS deserializer implementation that deserializes the SpecificRecord as per the encoded schema ID from the Schema Registry (the implementation is available on GitHub).
    • AVRO_RECORD_TYPEAvroRecordType.SPECIFIC_RECORD
private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "unicorn.riderequest.consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.SPECIFIC_RECORD.getName());
        return props;
}
  • startConsumer() assumes the role in Account B to be able to connect with the Schema Registry in Account B and reads messages from the Kafka topic on the MSK cluster:
public void startConsumer() {
  logger.info("starting consumer...");
  assumeGlueSchemaRegistryRole();
  KafkaConsumer<String, UnicornRideRequest> consumer = new KafkaConsumer<String, UnicornRideRequest>(getConsumerConfig());
  consumer.subscribe(Collections.singletonList(topic));
  int count = 0;
  while (true) {
            final ConsumerRecords<String, UnicornRideRequest> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, UnicornRideRequest> record : records) {
                final UnicornRideRequest rideRequest = record.value();
                logger.info(String.valueOf(rideRequest.getRequestId()));
                logger.info(rideRequest.toString());
            }
        }
}
  • assumeGlueSchemaRegistryRole() as shown in the following code uses AWS STS to assume the cross-account Schema Registry IAM role in Account B. The response from stsClient.assumeRole(roleRequest) contains the temporary credentials, which include accessKeyId, secretAccessKey, and a sessionToken. It then sets the temporary credentials in the system properties. The SDK for Java uses these credentials while accessing the Schema Registry (through the Schema Registry serializer). For more information, see Using Credentials.
public void assumeGlueSchemaRegistryRole() {
        try {
	Region region = Region.of(regionName);
            if(!Region.regions().contains(region))
                 throw new RuntimeException("Region : " + regionName + " is invalid.");
            StsClient stsClient = StsClient.builder().region(region).build();
            AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                    .roleArn(this.assumeRoleARN)
                    .roleSessionName("kafka-consumer-cross-account-glue-schemaregistry-demo")
                    .externalId(this.externalId)
                    .build();
            AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
            Credentials myCreds = roleResponse.credentials();
            System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
            System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
            System.setProperty("aws.sessionToken", myCreds.sessionToken());
            stsClient.close();
        } catch (StsException e) {
            logger.error(e.getMessage());
            System.exit(1);
        }
    }

Compile and generate Avro schema classes

Like any other part of building and deploying your application, schema compilation and the process of generating Avro schema classes should be included in your CI/CD pipeline. There are multiple ways to generate Avro schema classes; we use avro-maven-plugin for this post. The CI/CD process can also use avro-tools to compile Avro schema to generate classes. The following code is an example of how you can use avro-tools:

java -jar /path/to/avro-tools-1.10.2.jar compile schema <schema file> <destination>

//compiling unicorn_ride_request.avsc
java -jar avro-tools-1.10.2.jar compile schema unicorn_ride_request.avsc .

Implementation overview

To recap, we start with defining and registering an Avro schema for the unicorn ride request message in the AWS Glue Schema Registry in Account B, the central data lake account. In Account A, we create an MSK cluster and Kafka producer and consumer EC2 instances with their respective application code (kafka-cross-account-gsr-consumer.jar and kafka-cross-account-gsr-producer.jar) and deployed in them using the CloudFormation stack.

When we run the producer application in Account A, the serializer (GlueSchemaRegistryKafkaSerializer) from the AWS Glue Schema Registry SerDe library provided as the configuration gets the unicorn ride request schema (UnicornRideRequest.avsc) from the central Schema Registry residing in Account B to serialize the unicorn ride request message. It uses the IAM role (temporary credentials) in Account B and Region, schema registry name (unicorn-ride-request-registry), and schema name (unicorn-ride-request-schema-avro) provided as the configuration to connect to the central Schema Registry. After the message is successfully serialized, the producer application sends it to the Kafka topic (unicorn-ride-request-topic) on the MSK cluster.

When we run the consumer application in Account A, the deserializer (GlueSchemaRegistryKafkaDeserializer) from the Schema Registry SerDe library provided as the configuration extracts the encoded schema ID from the message read from the Kafka topic (unicorn-ride-request-topic) and gets the schema for the same ID from the central Schema Registry in Account B. It then deserializes the message. It uses the IAM role (temporary credentials) in Account B and the Region provided as the configuration to connect to the central Schema Registry. The consumer application also configures Avro’s SPECIFIC_RECORD to inform the deserializer that the message is of a specific type (unicorn ride request). After the message is successfully deserialized, the consumer application processes it as per the requirements.

Clean up

The final step is to clean up. To avoid unnecessary charges, you should remove all the resources created by the CloudFormation stacks used for this post. The simplest way to do so is to delete the stacks. First delete the MSKClusterStack followed by MSKClientStack from Account A. Then delete the SchemaRegistryStack from Account B.

Conclusion

In this post, we demonstrated how to use AWS Glue Schema Registry with Amazon MSK and stream processing applications to validate messages using an Avro schema. We created a distributed architecture where the Schema Registry resides in a central AWS account (data lake account) and Kafka producer and consumer applications reside in a separate AWS account. We created an Avro schema in the schema registry in the central account to make it efficient for the application teams to maintain schemas in a single place. Because AWS Glue Schema Registry supports identity-based access policies, we used the cross-account IAM role to allow the Kafka producer and consumer applications running in a separate account to securely access the schema from the central account to validate messages. Because the Avro schema was agreed in advance, we used Avro SpecificRecord to ensure type safety at compile time and avoid runtime schema validation issues at the client side. The code used for this post is available on GitHub for reference.

To learn more about the services and resources in this solution, refer to AWS Glue Schema Registry, the Amazon MSK Developer Guide, the AWS Glue Schema Registry SerDe library, and IAM tutorial: Delegate access across AWS accounts using IAM roles.


About the Author

Vikas Bajaj is a Principal Solutions Architect at Amazon Web Service. Vikas works with digital native customers and advises them on technology architecture and modeling, and options and solutions to meet strategic business objectives. He makes sure designs and solutions are efficient, sustainable, and fit-for-purpose for current and future business needs. Apart from architecture and technology discussions, he enjoys watching and playing cricket.

Evolve JSON Schemas in Amazon MSK and Amazon Kinesis Data Streams with the AWS Glue Schema Registry

Post Syndicated from Aditya Challa original https://aws.amazon.com/blogs/big-data/evolve-json-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-the-aws-glue-schema-registry/

Data is being produced, streamed, and consumed at an immense rate, and that rate is projected to grow exponentially in the future. In particular, JSON is the most widely used data format across streaming technologies and workloads. As applications, websites, and machines increasingly adopt data streaming technologies such as Apache Kafka and Amazon Kinesis Data Streams, which serve as a highly available transport layer that decouples the data producers from data consumers, it can become progressively more challenging for teams to coordinate and evolve JSON Schemas. Adding or removing a field or changing the data type on one or more existing fields could introduce data quality issues and downstream application failures without careful data handling. Teams rely on custom tools, complex code, tedious processes, or unreliable documentation to protect against these schema changes. This puts heavy dependency on human oversight, which can make the change management processes error-prone. A common solution is a schema registry that enables data producers and consumers to perform validation of schema changes in a coordinated fashion. This allows for risk-free evolution as business demands change over time.

The AWS Glue Schema Registry, a serverless feature of AWS Glue, now enables you to validate and reliably evolve streaming data against JSON Schemas. The Schema Registry is a free feature that can significantly improve data quality and developer productivity. With it, you can eliminate defensive coding and cross-team coordination, reduce downstream application failures, and use a registry that is integrated across multiple AWS services. Each schema can be versioned within the guardrails of a compatibility mode, providing developers the flexibility to reliably evolve JSON Schemas. Additionally, the Schema Registry can serialize data into a compressed format, which helps you save on data transfer and storage costs.

This post shows you how to use the Schema Registry for JSON Schemas and provides examples of how to use it with both Kinesis Data Streams and Apache Kafka or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Overview of the solution

In this post, we walk you through a solution to store, validate, and evolve a JSON Schema in the AWS Glue Schema Registry. The schema is used by Apache Kafka and Kinesis Data Streams applications while producing and consuming JSON objects. We also show you what happens when a new version of the schema is created with a new field.

The following diagram illustrates our solution workflow:

The steps to implement this solution are as follows:

  1. Create a new registry and register a schema using an AWS CloudFormation template.
  2. Create a new version of the schema using the AWS Glue console that is backward-compatible with the previous version.
  3. Build a producer application to do the following:
    1. Generate JSON objects that adhere to one of the schema versions.
    2. Serialize the JSON objects into an array of bytes.
    3. Obtain the corresponding schema version ID from the Schema Registry and encode the byte array with the same.
    4. Send the encoded byte array through a Kinesis data stream or Apache Kafka topic.
  4. Build a consumer application to do the following:
    1. Receive the encoded byte array through a Kinesis data stream or Apache Kafka topic.
    2. Decode the schema version ID and obtain the corresponding schema from the Schema Registry.
    3. Deserialize the array of bytes into the original JSON object.
    4. Consume the JSON object as needed.

Description of the schema used

For this post, we start with the following schema. The schema is of a weather report object that contains three main pieces of data: location, temperature, and timestamp. All three are required fields, but the schema does allow additional fields (indicated by the additionalProperties flag) such as windSpeed or precipitation if the producer wants to include them. The location field is an object with two string fields: city and state. Both are required fields and the schema doesn’t allow any additional fields within this object.

{
    "$id": "https://example.com/weather-report.schema.json",
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "WeatherReport",
    "type": "object",
    "properties": {
        "location": {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "Name of the city where the weather is being reported."
                },
                "state": {
                    "type": "string",
                    "description": "Name of the state where the weather is being reported."
                }
            },
            "additionalProperties": false,
            "required": [
                "city",
                "state"
            ]
        },
        "temperature": {
            "type": "integer",
            "description": "Temperature in Farenheit."
        },
        "timestamp": {
            "description": "Timestamp in epoch format at which the weather was noted.",
            "type": "integer"
        }
    },
    "additionalProperties": true,
    "required": [
        "location",
        "temperature",
        "timestamp"
    ]
}

Using the above schema, a valid JSON object would look like this:

{
    "location": {
        "city": "Phoenix",
        "state": "Arizona"
    },
    "temperature": 115,
    "windSpeed": 50,
    "timestamp": 1627335205
}

Deploy with AWS CloudFormation

For a quick start, you can deploy the provided CloudFormation stack. The CloudFormation template generates the following resources in your account:

  • Registry – A registry is a container of schemas. Registries allow you to organize your schemas, as well as manage access control for your applications. A registry has an Amazon Resource Name (ARN) to allow you to organize and set different access permissions to schema operations within the registry.
  • Schema – A schema defines the structure and format of a data record. A schema is a versioned specification for reliable data publication, consumption, or storage. Each schema can have multiple versions. Versioning is governed by a compatibility rule that is applied on a schema. Requests to register new schema versions are checked against this rule by the Schema Registry before they can succeed.

To manually create these resources without using AWS CloudFormation, refer to Creating a Registry and Creating a Schema.

Prerequisites

Make sure to complete the following steps as prerequisites:

  1. Create an AWS account. For this post, you configure the required AWS resources in the us-east-1 or us-west-2 Region. If you haven’t signed up, complete the following tasks:
    1. Create an account. For instructions, see Sign Up for AWS.
    2. Create an AWS Identity and Access Management (IAM) user. For instructions, see Creating an IAM User in your AWS account.
  2. Choose Launch Stack to launch the CloudFormation stack:

Review the newly registered schema

Let’s review the registry and the schema on the AWS Glue console.

  1. Sign in to the AWS Glue console and choose the appropriate Region.
  2. Under Data Catalog, choose Schema registries.
  3. Choose the GsrBlogRegistry schema registry.
  4. Choose the GsrBlogSchema schema.
  5. Choose Version 1.

We can see the JSON Schema version details and its definition. Note that the compatibility mode chosen is Backward compatibility. We see the purpose of that in the next section.

Evolve the schema by creating a new backward-compatible version

In this section, we take what is created so far and add a new schema version to demonstrate how we can evolve our schema while keeping the integrity intact.

To add a new schema version, complete the following steps, continuing from the previous section:

  1. On the Schema version details page, choose Register new version.
  2. Inside the properties object within the location object (after the state field), add a new country field as follows:
    "country": {
              "type": "string",
              "description": "Name of the country where the weather is being reported."
            }

Because the compatibility mode chosen for the schema is backward compatibility, it’s important that we don’t make this new field a required field. If we do that, the Schema Registry fail this new version.

  1. Choose Register version.

We now have a new version of the schema that allows the producers to include an optional country field within the location object if they choose to.

Use the AWS Glue Schema Registry

In this section, we walk through the steps to use the Schema Registry with Kinesis Data Streams or Apache Kafka.

Prerequisites

Make sure to complete the following steps as prerequisites:

  1. Configure your AWS credentials in your local machine.
  2. Install Maven on the local machine.
  3. Download the application code from the GitHub repo.
  4. Build the package:
    mvn clean package

Use the Schema Registry with Kinesis Data Streams

Run the Kinesis producer code to produce JSON messages that are associated with a schema ID assigned by the Schema Registry:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kinesis.RunKinesisProducer" -Dexec.args="<<KINESIS_DATA_STREAM_NAME>>"

This command returns the following output:

Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 0
Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 1
Putting 1 record into <<KINESIS_DATA_STREAM_NAME>>
Sent message 2
Successfully produced 3 messages to a stream called <<KINESIS_DATA_STREAM_NAME>>

Run the Kinesis consumer code to receive JSON messages with the schema ID, obtain the schema from the Schema Registry, and validate:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kinesis.RunKinesisConsumer" -Dexec.args="<<KINESIS_DATA_STREAM_NAME>>"

This command returns the following output with the JSON records received and decoded:

Number of Records received: 1
[JsonDataWithSchema(schema={"$schema":"http://json-schema.org/draft-07/schema#","additionalProperties":true,"title":"WeatherReport","type":"object","properties":{"temperature":{"description":"Temperature in Farenheit.","type":"integer"},"location":{"additionalProperties":false,"type":"object","properties":{"city":{"description":"Name of the city where the weather is being reported.","type":"string"},"state":{"description":"Name of the state where the weather is being reported.","type":"string"}},"required":["city","state"]},"timestamp":{"description":"Timestamp in epoch format at which the weather was noted.","type":"integer"}},"required":["location","temperature","timestamp"],"$id":"https://example.com/weather-report.schema.json"}, payload={"temperature":89,"location":{"city":"Orlando","state":"Florida"},"timestamp":1627335205})]

Use the Schema Registry with Apache Kafka

In the root of the downloaded GitHub repo folder, create a config file with the connection parameters for the Kafka cluster:

# Kafka
bootstrap.servers=localhost:9092

Run the Kafka producer code to produce JSON messages that are associated with a schema ID assigned by the Schema Registry:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kafka.RunKafkaProducer" -Dexec.args="<<CONFIG_FILE_NAME>><< TOPIC_NAME>>"

This command returns the following output:

Sent message 0
Sent message 1
Sent message 2
Successfully produced 3 messages to a topic called <<TOPIC_NAME>>

Run the Kafka consumer code to consume JSON messages with the schema ID, obtain the schema from the Schema Registry, and validate:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.gsr.samples.json.kafka.RunKafkaConsumer" -Dexec.args="<<CONFIG_FILE_NAME>> <<TOPIC_NAME>>"

This command returns the following output with the JSON records received and decoded:

Received message: key = message-0, value = JsonDataWithSchema(schema={"$schema":"http://json-schema.org/draft-07/schema#","additionalProperties":true,"title":"WeatherReport","type":"object","properties":{"temperature":{"description":"Temperature in Farenheit.","type":"integer"},"location":{"additionalProperties":false,"type":"object","properties":{"city":{"description":"Name of the city where the weather is being reported.","type":"string"},"state":{"description":"Name of the state where the weather is being reported.","type":"string"}},"required":["city","state"]},"timestamp":{"description":"Timestamp in epoch format at which the weather was noted.","type":"integer"}},"required":["location","temperature","timestamp"],"$id":"https://example.com/weather-report.schema.json"}, payload={"temperature":115,"location":{"city":"Phoenix","state":"Arizona"},"windSpeed":50,"timestamp":1627335205})

Clean up

Now to the final step, cleaning up the resources. Delete the CloudFormation stack to remove any resources you created as part of this walkthrough.

Schema Registry features

Let’s discuss the features the Schema Registry has to offer:

  • Schema discovery – When a producer registers a schema change, metadata can be applied as a key-value pair to provide searchable information for administrators or developers. This metadata can indicate the original source of the data (source=MSK_west), the team’s name to contact (owner=DataEngineering), or AWS tags (environment=Production). You could potentially encrypt a field in your data on the producing client and use metadata to specify to potential consumer clients which public key fingerprint to use for decryption.
  • Schema compatibility – The versioning of each schema is governed by a compatibility mode. If a new version of a schema is requested to be registered that breaks the specified compatibility mode, the request fails, and an exception is thrown. Compatibility checks enable developers building downstream applications to have a bounded set of scenarios to build applications against, which helps prepare for the changes without issue. Commonly used modes are FORWARD, BACKWARD, and FULL. For more information about mode definitions, see Schema Versioning and Compatibility.
  • Schema validation – Schema Registry serializers work to validate that the data produced is compatible with the assigned schema. If it isn’t, the data producer receives an exception from the serializer. This ensures that potentially breaking changes are found earlier in development cycles and can also help prevent unintentional schema changes due to human error.
  • Auto-registration of schemas – If configured to do so, the data producer can auto-register schema changes as they flow in the data stream. This is especially helpful for use cases where the source of the data is generated by a change data capture process (CDC) from the database.
  • IAM support – Due to integrated IAM support, only authorized producers can change certain schemas. Furthermore, only those consumers authorized to read the schema can do so. Schema changes are typically performed deliberately and with care, so it’s important to use IAM to control who performs these changes. Additionally, access control to schemas is important in situations where you might have sensitive information included in the schema definition itself. In the previous examples, IAM roles are inferred via the AWS SDK for Java, so they are inherited from the Amazon Elastic Compute Cloud (Amazon EC2) instance’s role that the application runs on, if using Amazon EC2. You can also apply IAM roles to any other AWS service that could contain this code, such as containers or AWS Lambda functions.
  • Secondary deserializer – If you have already registered schemas in another schema registry, there’s an option for specifying a secondary deserializer when performing schema lookups. This allows for migrations from other schema registries without having to start all over again. Any schema ID that is unknown to the Schema Registry is looked up in the registry tied to the secondary deserializer.
  • Compression – Using a schema registry can reduce data payload by no longer needing to send and receive schemas with each message. Schema Registry libraries also provide an option for zlib compression, which can reduce data requirements even further by compressing the payload of the message. This varies by use case, but compression can reduce the size of the message significantly.
  • Multiple data formats – The Schema Registry currently supports AVRO (v1.10.2) data format, JSON data format with JSON Schema format for the schema (specifications Draft-04, Draft-06, and Draft-07), and Java language support, with other data formats and languages to come.

Conclusion

In this post, we discussed the benefits of using the AWS Glue Schema Registry to register, validate, and evolve JSON Schemas for data streams as business needs change. We also provided examples of how to use the Schema Registry.

Learn more about Integrating with AWS Glue Schema Registry.


About the Author

Aditya Challa is a Senior Solutions Architect at Amazon Web Services. Aditya loves helping customers through their AWS journeys because he knows that journeys are always better when there’s company. He’s a big fan of travel, history, engineering marvels, and learning something new every day.

Handle fast-changing reference data in an AWS Glue streaming ETL job

Post Syndicated from Jerome Rajan original https://aws.amazon.com/blogs/big-data/handle-fast-changing-reference-data-in-an-aws-glue-streaming-etl-job/

Streaming ETL jobs in AWS Glue can consume data from streaming sources such as Amazon Kinesis and Apache Kafka, clean and transform those data streams in-flight, as well as continuously load the results into Amazon Simple Storage Service (Amazon S3) data lakes, data warehouses, or other data stores.

The always-on nature of streaming jobs poses a unique challenge when handling fast-changing reference data that is used to enrich data streams within the AWS Glue streaming ETL job. AWS Glue processes real-time data from Amazon Kinesis Data Streams using micro-batches. The foreachbatch method used to process micro-batches handles one data stream.

This post proposes a solution to enrich streaming data with frequently changing reference data in an AWS Glue streaming ETL job.

You can enrich data streams with changing reference data in the following ways:

  • Read the reference dataset with every micro-batch, which can cause redundant reads and an increase in read requests. This approach is expensive, inefficient, and isn’t covered in this post.
  • Design a method to tell the AWS Glue streaming job that the reference data has changed and refresh it only when needed. This approach is cost-effective and highly available. We recommend using this approach.

Solution overview

This post uses DynamoDB Streams to capture changes to reference data, as illustrated in the following architecture diagram. For more information about DynamoDB Streams, see DynamoDB Streams Use Cases and Design Patterns.

The workflow contains the following steps:

  1. A user or application updates or creates a new item in the DynamoDB table.
  2. DynamoDB Streams is used to identify changes in the reference data.
  3. A Lambda function is invoked every time a change occurs in the reference data.
  4. The Lambda function captures the event containing the changed record, creates a “change file” and places it in an Amazon S3 bucket.
  5. The AWS Glue job is designed to monitor the stream for this value in every micro-batch. The moment that it sees the change flag, AWS Glue initiates a refresh of the DynamoDB data before processing any further records in the stream.

This post is accompanied by an AWS CloudFormation template that creates resources as described in the solution architecture:

  • A DynamoDB table named ProductPriority with a few items loaded
  • An S3 bucket named demo-bucket-<AWS AccountID>
  • Two Lambda functions:
    • demo-glue-script-creator-lambda
    • demo-reference-data-change-handler
  • A Kinesis data stream named SourceKinesisStream
  • An AWS Glue Data Catalog database called my-database
  • Two Data Catalog tables
  • An AWS Glue job called demo-glue-job-<AWS AccountID>. The code for the AWS Glue job can be found at this link.
  • Two AWS Identity and Access Management (IAM) roles:
    • A role for the Lambda functions to access Kinesis, Amazon S3, and DynamoDB Streams
    • A role for the AWS Glue job to access Kinesis, Amazon S3, and DynamoDB
  • An Amazon Kinesis Data Generator (KDG) account with a user created through Amazon Cognito to generate a sample data stream

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • The IAM user should have permissions to create the required roles
  • Permission to create a CloudFormation stack and the services we detailed

Create resources with AWS CloudFormation

To deploy the solution, complete the following steps:

  1. Choose Launch Stack:
  2. Set up an Amazon Cognito user pool and test if you can access the KDG URL specified in the stack’s output tab. Furthermore, validate if you can log in to KDG using the credentials provided while creating the stack.

You should now have the required resources available in your AWS account.

  1. Verify this list with the resources in the output section of the CloudFormation stack.

Sample data

Sample reference data has already been loaded into the reference data store. The following screenshot shows an example.

The priority value may change frequently based on the time of the day, the day of the week, or other factors that drive demand and supply.

The objective is to accommodate these changes to the reference data seamlessly into the pipeline.

Generate a randomized stream of events into Kinesis

Next, we simulate a sample stream of data into Kinesis. For detailed instructions, see Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator. For this post, we define the structure of the simulated orders data using a parameterized template.

  1. On the KDG console, choose the Region where the source Kinesis stream is located.
  2. Choose your delivery stream.
  3. Enter the following template into the Record template field:
    {
    "dish": "{{random.arrayElement(["pizza","burger","salad","donut","ice-cream"])}}"
    ,"cost": {{random.number({"min":10,"max":150})}}
    ,"customer_id":{{random.number({"min":1,"max":10000})}}
    }

  4. Choose Test template, then choose Send data.

KDG should start sending a stream of randomly generated orders to the Kinesis data stream.

Run the AWS Glue streaming job

The CloudFormation stack created an AWS Glue job that reads from the Kinesis data stream through a Data Catalog table, joins with the reference data in DynamoDB, and writes the result to an S3 bucket. To run the job, complete the following steps:

  1. On the AWS Glue console, under ETL in the navigation pane, choose Jobs.
  2. Select the job demo-glue-job-<AWS AccountID>.
  3. On the Actions menu, choose Run job.

In addition to the enrichment, the job includes an additional check that monitors an Amazon S3 prefix for a “Change Flag” file. This file is created by the Lambda function, which is invoked by the DynamoDB stream whenever there is an update or a new reference item.

Investigate the target data in Amazon S3

The following is a screenshot of the data being loaded in real time into the item=burger partition. The priority was set to medium in the reference data, and the orders go into the corresponding partition.

Update the reference data

Now we update the priority for burgers to high in the DynamoDB table through the console while the orders are streaming into the pipeline.

Use the following command to perform the update through Amazon CloudShell. Change the Region to the appropriate value.

aws dynamodb update-item --table-name "ProductPriority" --key '{"item":{"S":"burger"}, "price":{"N":"100"}}' --update-expression "SET priority = :s" --expression-attribute-values '{":s": {"S": "high"}}' --return-values ALL_NEW --region us-east-1

Verify that the data got updated.

Navigate to the target S3 folder to confirm the contents. The AWS Glue job should have started sending the orders for burgers into the high partition.

The Lambda function is invoked by the DynamoDB stream and places a “Change Flag” file in an Amazon S3 bucket. The AWS Glue job refreshes the reference data and deletes the file to avoid redundant refreshes.

Using this pattern for reference data in Amazon S3

If the reference data is stored in an S3 bucket, create an Amazon S3 event notification that identifies changes to the prefix where the reference data is stored. The event notification invokes a Lambda function that inserts the change flag into the data stream.

Cleaning up

To avoid incurring future charges, delete the resources. You can do this by deleting the CloudFormation stack.

Conclusion

In this post, we discussed  approaches to handle fast-changing reference data stored in DynamoDB or Amazon S3. We demonstrated a simple use case that implements this pattern.

Note that DynamoDB Streams writes stream records in near-real time. When designing your solution, account for a minor delay between the actual update in DynamoDB and the write into the DynamoDB stream.


About the Authors

Jerome Rajan is a Lead Data Analytics Consultant at AWS. He helps customers design & build scalable analytics solutions and migrate data pipelines and data warehouses into the cloud. In an alternate universe, he is a World Chess Champion!

Dipankar Ghosal is a Principal Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Securely share your data across AWS accounts using AWS Lake Formation

Post Syndicated from Yumiko Kanasugi original https://aws.amazon.com/blogs/big-data/securely-share-your-data-across-aws-accounts-using-aws-lake-formation/

Data lakes have become very popular with organizations that want a centralized repository that allows you to store all your structured data and unstructured data at any scale. Because data is stored as is, there is no need to convert it to a predefined schema in advance. When you have new business use cases, you can easily build new types of analyses on top of the data lake, at any time.

In real-world use cases, it’s common to have requirements to share data stored within the data lake with multiple companies, organizations, or business units. For example, you may want to provide your data to stakeholders in another company for a co-marketing campaign between the two companies. For any of these use cases, the producer party wants to share data in a secure and effective manner, without having to copy the entire database.

In August 2019, we announced the general availability of AWS Lake Formation, a fully managed service that makes it easy to set up a secure data lake in days. AWS Lake Formation permission management capabilities simplify securing and managing distributed data lakes across multiple AWS accounts through a centralized approach, providing fine-grained access control to the AWS Glue Data Catalog and Amazon Simple Storage Service (Amazon S3) locations.

There are two options to share your databases and tables with another account by using Lake Formation cross-account access control:

  • Lake Formation tag-based access control (recommended)
  • Lake Formation named resources

In this post, I explain the differences between these two options, and walk you through the steps to configure cross-account sharing.

Overview of tag-based access control

Lake Formation tag-based access control is an authorization strategy that defines permissions based on attributes. In Lake Formation, these attributes are called LF-tags. You can attach LF-tags to Data Catalog resources and Lake Formation principals. Data lake administrators can assign and revoke permissions on Lake Formation resources using these LF-tags. For more details about tag-based access control, refer to Easily manage your data lake at scale using AWS Lake Formation Tag-based access control.

The following diagram illustrates the architecture of this method.

We recommend tag-based access control for the following use cases:

  • You have a large number of tables and principals that the data lake administrator has to grant access to
  • You want to classify your data based on an ontology and grant permissions based on classification
  • The data lake administrator wants to assign permissions dynamically, in a loosely coupled way

You can also use tag-based access control to share Data Catalog resources (databases, tables, and columns) with external AWS accounts.

Overview of named resources

The Lake Formation named resource method is an authorization strategy that defines permissions for resources. Resources include databases, tables, and columns. Data lake administrators can assign and revoke permissions on Lake Formation resources. See Cross-Account Access: How It Works for details.

The following diagram illustrates the architecture for this method.

We recommend using named resources if the data lake administrator prefers granting permissions explicitly to individual resources.

When you use the named resource method to grant Lake Formation permissions on a Data Catalog resource to an external account, Lake Formation uses AWS Resource Access Manager (AWS RAM) to share the resource.

Now, let’s take a closer look at how to configure cross-account access with these two options. We refer to the account that has the source table as the producer account, and refer to the account that needs access to the source table as consumer account.

Configure Lake Formation Data Catalog settings in the producer account

Lake Formation provides its own permission management model. To maintain backward compatibility with the AWS Identity and Access Management (IAM) permission model, the Super permission is granted to the group IAMAllowedPrincipals on all existing AWS Glue Data Catalog resources by default. Also, Use only IAM access control settings are enabled for new data catalog resources.

In this post, we do fine grained access control using Lake Formation permissions and use IAM policies for coarse grained access control. See Methods for Fine-Grained Access Control for details. Therefore, before you use an AWS CloudFormation template for a quick setup, you need to change Lake Formation Data Catalog settings in the producer account.

This setting affects all newly created databases and tables, so we strongly recommend completing this tutorial in a non-production or new account. Also, if you’re using a shared account (such as your company’s dev account), make sure it doesn’t affect others resources. If you prefer to keep the default security settings, you must complete an extra step when sharing resources to other accounts, in which you revoke the default Super permission from IAMAllowedPrincipals on the database or table. We discuss the details later in this post.

To configure Lake Formation Data Catalog settings in the producer account, complete the following steps:

  1. Sign in to the producer account as an admin user, or a user with Lake Formation PutDataLakeSettings API permission.
  2. On the Lake Formation console, in the navigation pane, under Data catalog, choose Settings.
  3. Deselect Use only IAM access control for new databases and Use only IAM access control for new tables in new databases
  4. Choose Save.

Additionally, you can remove CREATE_DATABASE permissions for IAMAllowedPrincipals under Administrative roles and tasks > Database creators. Only then, who can create a new database is governed through Lake Formation permissions.

Set up resources with AWS CloudFormation

We provide two CloudFormation templates in this post: one for the producer account, and one for the consumer account.

The CloudFormation template for the producer account generates the following resources:

  • An S3 bucket to serve as our data lake.
  • A Lambda function (for Lambda-backed AWS CloudFormation custom resources). We use the function to copy sample data files from the public S3 bucket to your S3 bucket.
  • IAM users and policies:
    • DataLakeAdminProducer
  • An AWS Glue Data Catalog database, table, and partition. Because we introduce two options for sharing resources across AWS accounts, this template creates two separate sets of database and table.
  • Lake Formation data lake settings and permissions. This includes:

The CloudFormation template for the consumer account generates the following resources:

  • IAM users and policies:
    • DataLakeAdminConsumer
    • DataAnalyst
  • An AWS Glue Data Catalog database. We use this database for creating resource links to shared resources.

Launch the CloudFormation stack in the producer account

To launch the CloudFormation stack in the producer account, complete the following steps:

  1. Sign in to the producer account’s AWS CloudFormation console in the target Region.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-producer.
  5. For ProducerDatalakeAdminUserName and ProducerDatalakeAdminUserPassword, enter the user name and password you want for the data lake admin IAM user.
  6. For DataLakeBucketName, enter the name of your data lake bucket. This name needs to be globally unique.
  7. For DatabaseName and TableName, leave the default values.
  8. Choose Next.
  9. On the next page, choose Next.
  10. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  11. Choose Create stack.

Launch the CloudFormation stack in the consumer account

To launch the CloudFormation stack in the consumer account, complete the following steps:

  1. Sign in to the consumer account’s AWS CloudFormation console in the target Region.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For Stack name, enter a stack name, such as stack-consumer.
  5. For ConsumerDatalakeAdminUserName and ConsumerDatalakeAdminUserPassword, enter the user name and password you want for the data lake admin IAM user.
  6. For DataAnalystUserName and DataAnalystUserPassword, enter the user name and password you want for the data analyst IAM user.
  7. For DatabaseName, leave the default values.
  8. For AthenaQueryResultS3BucketName, enter the name of the S3 bucket that stores Amazon Athena query results. If you don’t have one, create an S3 bucket.
  9. Choose Next.
  10. On the next page, choose Next.
  11. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  12. Choose Create stack.

Stack creation can take about 1 minute.

(Optional) AWS KMS server-side encryption

If the source S3 bucket is encrypted using server-side encryption with an AWS Key Management Service (AWS KMS) customer master key (CMK), make sure the IAM role that Lake Formation uses to access S3 data is registered as the key user for the KMS CMK. By default, the IAM role AWSServiceRoleForLakeFormationDataAccess is used, but you can choose other IAM roles when registering an S3 data lake location. To register the Lake Formation role as the KMS key user, you can use the AWS KMS console, or directly add the permission to the key policy using the KMS PutKeyPolicy API and the AWS Command Line Interface (AWS CLI).

You don’t have to add individual consumer accounts to the key policy. Only the role that Lake Formation uses is required. Also, this step isn’t necessary if the source S3 bucket is encrypted with server-side encryption with Amazon S3, or an AWS managed key.

To add a Lake Formation role as the KMS key user via the console, complete the following steps:

  1. Sign in to the AWS KMS console as the key administrator.
  2. In the navigation pane, under Customer managed keys, choose the key that is used to encrypt the source S3 bucket.
  3. Under Key users, choose Add.
  4. Select AWSServiceRoleForLakeFormationDataAccess and choose Add.

To use the AWS CLI, enter the following command (replace <key-id>, <name-of-key-policy>, and <key-policy> with valid values):

aws kms put-key-policy --key-id <key-id> --policy-name <name-of-key-policy> --policy <key-policy>

For more information, see put-key-policy.

Lake Formation cross-account sharing prerequisites

Before sharing resources with Lake Formation, there are prerequisites for both the tag-based access control method and named resource method.

Tag-based access control cross-account sharing prerequisites

As described in Lake Formation Tag-Based Access Control Cross-Account Prerequisites, before you can use the tag-based access control method to grant cross-account access to resources, you must add the following JSON permissions object to the AWS Glue Data Catalog resource policy in the producer account. This gives the consumer account permission to access the Data Catalog when glue:EvaluatedByLakeFormationTags is true. Also, this condition becomes true for resources on which you granted permission using Lake Formation permission Tags to the consumer’s account. This policy is required for every AWS account that you’re granting permissions to.

The following policy must be within a Statement element. We discuss the full IAM policy later in this post.

{
    "Effect": "Allow",
    "Action": [
        "glue:*"
    ],
    "Principal": {
        "AWS": [
            "<consumer-account-id>"
        ]
    },
    "Resource": [
        "arn:aws:glue:<region>:<account-id>:table/*",
        "arn:aws:glue:<region>:<account-id>:database/*",
        "arn:aws:glue:<region>:<account-id>:catalog"
    ],
    "Condition": {
        "Bool": {
            "glue:EvaluatedByLakeFormationTags": true
        }
    }
}

Named resource method cross-account sharing prerequisites

As described in Managing Cross-Account Permissions Using Both AWS Glue and Lake Formation, if there is no Data Catalog resource policy in your account, the Lake Formation cross-account grants that you make proceed as usual. However, if a Data Catalog resource policy exists, you must add the following statement to it to permit your cross-account grants to succeed if they’re made with the named resource method. If you plan to use only the named resource method, or only the tag-based access control method, you can skip this step. In this post, we evaluate both methods, so we need to add the following policy.

The following policy must be within a Statement element. We discuss the full IAM policy in the next section.

{
    "Effect": "Allow",
    "Action": [
        "glue:ShareResource"
    ],
    "Principal": {
        "Service": [
            "ram.amazonaws.com"
        ]
    },
    "Resource": [
        "arn:aws:glue:<region>:<account-id>:table/*/*",
        "arn:aws:glue:<region>:<account-id>:database/*",
        "arn:aws:glue:<region>:<account-id>:catalog"
    ]
}

Add the AWS Glue Data Catalog resource policy using the AWS CLI

If we grant cross-account permissions by using both the tag-based access control method and named resource method, we must set the EnableHybrid argument to ‘true’ when adding the preceding policies. Because this option isn’t currently supported on the console, we must use the glue:PutResourcePolicy API and AWS CLI.

First, create a policy document (such as policy.json) and add the preceding two policies. Replace <consumer-account-id> with the account ID of the AWS account receiving the grant, <region> with the Region of the Data Catalog containing the databases and tables that you are granting permissions on, and <account-id> with the producer AWS account ID.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "ram.amazonaws.com"
            },
            "Action": "glue:ShareResource",
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:table/*/*",
                "arn:aws:glue:<region>:<account-id>:database/*",
                "arn:aws:glue:<region>:<account-id>:catalog"
            ]
        },
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "<consumer-account-id>"
            },
            "Action": "glue:*",
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:table/*/*",
                "arn:aws:glue:<region>:<account-id>:database/*",
                "arn:aws:glue:<region>:<account-id>:catalog"
            ],
            "Condition": {
                "Bool": {
                    "glue:EvaluatedByLakeFormationTags": "true"
                }
            }
        }
    ]
}

Enter the following AWS CLI command. Replace <glue-resource-policy> with the correct values (such as file://policy.json).

aws glue put-resource-policy --policy-in-json <glue-resource-policy> --enable-hybrid TRUE

For more information, see put-resource-policy.

Implement the Lake Formation tag-based access control method

In this section, we walk through the following high-level steps:

  1. Define an LF-tag.
  2. Assign the LF-tag to the target resource.
  3. Grant LF-tag permissions to the consumer account.
  4. Grant data permissions to the consumer account.
  5. Optionally, revoke permissions for IAMAllowedPrincipals on the database, tables, and columns.
  6. Create a resource link to the shared table.
  7. Create an LF-tag and assign it to the target database.
  8. Grant LF-tag data permissions to the consumer account.

Define an LF-tag

If you’re signed in to your producer account, sign out before completing the following steps.

  1. Sign in as the producer account data lake administrator. Use the producer account ID, IAM user name (the default is DatalakeAdminProducer), and password that you specified during CloudFormation stack creation.
  2. On the Lake Formation console, in the navigation pane, under Permissions, and under Administrative roles and tasks, choose LF-tags.
  3. Choose Add LF-tag.
  4. Specify the key and values. In this post, we create an LF-tag where the key is Confidentiality and the values are private, sensitive, and public.
  5. Choose Add LF-tag.

Assign the LF-tag to the target resource

As a data lake administrator, you can attach tags to resources. If you plan to use a separate role, you may have to grant describe and attach permissions to the separate role.

  1. In the navigation pane, under Data catalog, select Databases.
  2. Select the target database (lakeformation_tutorial_cross_account_database_tbac) and on the Actions menu, choose Edit LF-tags.

For this post, we assign an LF-tag to a database, but you can also assign LF-tags to tables and columns.

  1. Choose Assign new LF-Tag.
  2. Add the key Confidentiality and value public.
  3. Choose Save.

Grant LF-tag permission to the consumer account

Still in the producer account, we grant permissions to the consumer account to access the LF-tag.

  1. In the navigation pane, under Permissions, Administrative roles and tasks, LF-tag permissions, choose Grant.
  2. For Principals, choose External accounts.
  3. Enter the target AWS account ID.

AWS accounts within the same organization appear automatically. Otherwise, you have to manually enter the AWS account ID. As of this writing, Lake Formation tag-based access control doesn’t support granting permission to organizations or organization units.

  1. For LF-Tags, choose the key and values of the LF-tag that is being shared with the consumer account (key Confidentiality and value public).
  2. For Permissions, select Describe for LF-tag permissions.

LF-tag permissions are permissions given to the consumer account. Grantable permissions are permissions that the consumer account can grant to other principals.

  1. Choose Grant.

At this point, the consumer data lake administrator should be able to find the policy tag being shared via the consumer account Lake Formation console, under Permissions, Administrative roles and tasks, LF-tags.

Grant data permission to the consumer account

We will now provide data access to the consumer account by specifying an LF-Tag expression and granting the consumer account access to any table or database that matches the expression.

  1. In the navigation pane, under Permissions, Data lake permissions, choose Grant.
  2. For Principals, choose External accounts, and enter the consumer AWS account ID.
  3. For LF-tags or catalog resources, under Resources matched by LF-Tags (recommended), choose Add LF-Tag.
  4. Select the key and values of the tag that is being shared with the consumer account (key Confidentiality and value public).
  5. For Database permissions, select Describe under Database permissions to grant access permissions at the database level.
  6. Select Describe under Grantable permissions so the consumer account can grant database-level permissions to its users.
  7. For Table and column permissions, select Select and Describe under Table permissions.
  8. Select Select and Describe under Grantable permissions.
  9. Choose Grant.

Revoke permission for IAMAllowedPrincipals on the database, tables, and columns (Optional)

At the very beginning of this tutorial, we changed the Lake Formation Data Catalog settings. If you skipped that part, this step is required. If you changed your Lake Formation Data Catalog settings, you can skip this step.

In this step, we have to revoke the default Super permission from IAMAllowedPrincipals on the database or table. See Secure Existing Data Catalog Resources for details.

Before revoking permission for IAMAllowedPrincipals, make sure that you granted existing IAM principals with necessary permission through Lake Formation. This includes two steps:

  1. Add IAM permission to the target IAM user or role with the Lake Formation GetDataAccess action (with IAM policy).
  2. Grant the target IAM user or role with Lake Formation data permissions (alter, select, and so on)

Then, revoke permissions for IAMAllowedPrincipals. Otherwise, after revoking permissions for IAMAllowedPrincipals, existing IAM principals may no longer be able to access the target database or catalog.

Revoking Super permission for IAMAllowedPrincipals is required when you want to apply the Lake Formation permission model (instead of the IAM policy model) to manage user access within a single account or among multiple accounts using the Lake Formation permission model. You don’t have to revoke permission of IAMAllowedPrincipals for other tables where you want to keep the traditional IAM policy model.

At this point, the consumer account data lake administrator should be able to find the database and table being shared via the consumer account Lake Formation console, under Data catalog, Databases. If not, confirm if the following are properly configured:

  • Make sure the correct policy tag and values are assigned to the target databases and tables
  • Make sure the correct tag permission and data permission are assigned to the consumer account
  • Revoke the default super permission from IAMAllowedPrincipals on the database or table

Create a resource link to the shared table

When a resource is shared between accounts, the shared resources are not put in the consumer accounts’ catalog. To make them available, and query the underlying data of a shared table using services like Athena, we need to create a resource link to the shared table. A resource link is a Data Catalog object that is a link to a local or shared database or table. By creating a resource link, you can:

  • Assign a different name to a database or table that aligns with your Data Catalog resource naming policies
  • Use services such as Athena and Amazon Redshift Spectrum to query shared databases or tables

To create a resource link, complete the following steps:

  1. If you’re signed in to your consumer account, sign out.
  2. Sign in as the consumer account data lake administrator. Use the consumer account ID, IAM user name (default DatalakeAdminConsumer) and password that you specified during CloudFormation stack creation.
  3. On the Lake Formation console, in the navigation pane, under Data catalog, Databases, choose the shared database lakeformation_tutorial_cross_account_database_tbac.

If you don’t see the database, revisit the previous steps to see if everything is properly configured.

  1. Choose View tables.
  2. Choose the shared table amazon_reviews_table_tbac.
  3. On the Actions menu, choose Create resource link.
  4. For Resource link name, enter a name (for this post, amazon_reviews_table_tbac_resource_link).
  5. Under Database, select the database that the resource link is created in (for this post, the CloudFormation stack created the database lakeformation_tutorial_cross_account_database_consumer).
  6. Choose Create.

The resource link appears under Data catalog, Tables.

Create an LF-tag and assign it to the target database

Lake Formation tags reside in the same catalog as the resources. This means that tags created in the producer account aren’t available to use when granting access to the resource links in the consumer account. You need to create a separate set of LF-tags in the consumer account to use LF tag-based access control when sharing the resource links in the consumer account. Let’s first create the LF-tag. Refer to the previous sections for full instructions.

  1. Define the LF-tag in the consumer account. For this post, we use key Division and values sales, marketing, and analyst.
  2. Assign the LF-tag key Division and value analyst to the database lakeformation_tutorial_cross_account_database_consumer, where the resource link is created in.

Grant LF-tag data permission to the consumer

As a final step, we grant LF-tag data permission to the consumer.

  1. In the navigation pane, under Permissions, Data lake permissions, choose Grant.
  2. For Principals, choose IAM users and roles, and choose the user DataAnalyst.
  3. For LF-tags or catalog resources, choose Resources matched by LF-tags (recommended).
  4. Choose key Division and value analyst.
  5. For Database permissions, select Describe under Database permissions.
  6. For Table and column permissions, select Select and Describe under Table permissions.
  7. Choose Grant.
  8. Repeat these steps for user DataAnalyst, where the LF-tag key is Confidentiality and value is public.

At this point, the data analyst user in the consumer account should be able to find the database and resource link, and query the shared table via the Athena console.

If not, confirm if the following are properly configured:

  • Make sure the resource link is created for the shared table
  • Make sure you granted the user access to the LF-tag shared by the producer account
  • Make sure you granted the user access to the LF-tag associated to the resource link and database that the resource link is created in
  • Check if you assigned the correct LF-tag to the resource link, and to the database that the resource link is created in

Implement the Lake Formation named resource method

To use the named resource method, we walk through the following high-level steps:

  1. Optionally, revoke permission for IAMAllowedPrincipals on the database, tables, and columns.
  2. Grant data permission to the consumer account.
  3. Accept a resource share from AWS RAM.
  4. Create a resource link for the shared table.
  5. Grant data permission for the shared table to the consumer.
  6. Grant data permission for the resource link to the consumer.

Revoke permission for IAMAllowedPrincipals on the database, tables, and columns (Optional)

At the very beginning of this tutorial, we changed Lake Formation Data Catalog settings. If you skipped that part, this step is required. For instructions, see the optional step in the previous section.

Grant data permission to the consumer account

If you’re signed in to producer account as another user, sign out first.

  1. Sign in as the producer account data lake administrator using the AWS account ID, IAM user name (default is DatalakeAdminProducer), and password specified during CloudFormation stack creation.
  2. In the navigation pane, under Permissions, Data lake permissions, choose Grant.
  3. For Principals, choose External accounts, and enter one or more AWS account IDs or AWS Organizations IDs.

Organizations that the producer account belongs to and AWS accounts within the same organization appear automatically. Otherwise, manually enter the account ID or organization ID.

  1. For LF-tags or catalog resources, choose Named data catalog resources.
  2. Under Databases, choose the database lakeformation_tutorial_cross_account_database_named_resource.
  3. Under Tables, choose All tables.
  4. For Table and column permissions, select Select and Describe under Table permissions.
  5. Select Select and Describe under Grantable permissions.
  6. Optionally, for Data permissions, select Simple column-based access if column-level permission management is required.
  7. Choose Grant.

If you haven’t revoked permission for IAMAllowedPrincipals, you get a Grant permissions failed error.

At this point, you should see the target table being shared via AWS RAM with the consumer account under Permissions, Data permissions.

Accept a resource share from AWS RAM

This step is required only for account ID-based sharing, not for organization-based sharing.

  1. Sign in as the consumer account data lake administrator using the IAM user name (default is DatalakeAdminConsumer) and password specified during CloudFormation stack creation.
  2. On the AWS RAM console, in the navigation pane, under Shared with me, Resource shares, choose the shared Lake Formation resource.

The Status should be Pending.

  1. Confirm the resource details, and choose Accept resource share.

At this point, the consumer account data lake administrator should be able to find the shared resource on the Lake Formation console under Data catalog, Databases.

Create a resource link for the shared table

Follow the instructions detailed earlier to create a resource link for a shared table. Name the resource link amazon_reviews_table_named_resource_resource_link. Create the resource link in the database lakeformation_tutorial_cross_account_database_consumer.

Grant data permission for the shared table to the consumer

To grant data permission for the shared table to the consumer, complete the following steps:

  1. In the navigation pane, under Permissions, Data lake permissions, choose Grant.
  2. For Principals, choose IAM users and roles, and choose the user DataAnalyst.
  3. For LF-tags or catalog resources, choose Named data catalog resources.
  4. Under Databases, choose the database lakeformation_tutorial_cross_account_database_named_resource.

If you don’t see the database on the drop-down list, choose Load more.

  1. Under Tables, choose the table amazon_reviews_table_named_resource.
  2. For Table and column permissions, select Select and Describe under Table permissions.
  3. Choose Grant.

Grant data permission for the resource link to the consumer

In addition to granting the data lake user permission to access the shared table, you also need to grant the data lake user permission to access the resource link.

  1. In the navigation pane, under Permissions, Data lake permissions, choose Grant.
  2. For Principals, choose IAM users and roles, and choose the user DataAnalyst.
  3. For LF-tags or catalog resources, choose Named data catalog resources.
  4. Under Databases, choose the database lakeformation_tutorial_cross_account_database_consumer.
  5. Under Tables, choose the table amazon_reviews_table_named_resource_resource_link.
  6. For Resource link permissions, select Describe under Resource link permissions.
  7. Choose Grant.

At this point, the data analyst user in the consumer account should be able to find the database and resource link, and query the shared table via the Athena console.

If not, confirm if the following are properly configured:

  • Make sure the resource link is created for the shared table
  • Make sure you granted the user access to the table shared by the producer account
  • Make sure you granted the user access to the resource link and database that the resource link is created in

Clean up

To clean up the resources created within this tutorial, delete or change the following resources:

  • Producer account:
    • AWS RAM resource share
    • Lake Formation tags
    • CloudFormation stack
    • Lake Formation settings
    • AWS Glue Data Catalog settings
  • Consumer account:
    • Lake Formation tags
    • CloudFormation stack

Summary

Lake Formation cross-account sharing enables you to share data across AWS accounts without copying the actual data. Also, it provides both the producer and consumer with control over data permissions in a flexible way. In this post, we introduced two different options to reference catalog data from another account by using the cross-account access features provided by Lake Formation:

  • Tag-based access control
  • Named resource

The tag-based access control method is recommended when many resources and entities are involved. Although it seems like this option requires more steps, tag-based access control helps data lake administrators control relationships between each user and table via tags dynamically. The named resource method provides the data lake administrator with a more straightforward way to manage catalog permissions. You can choose the method that best fits your requirement.


About the author

Yumiko Kanasugi is a Solutions Architect with Amazon Web Services Japan, supporting digital native business customers to utilize AWS.

How Ribbon Communications Built a Scalable, Resilient Robocall Mitigation Platform

Post Syndicated from Siva Rajamani original https://aws.amazon.com/blogs/architecture/how-ribbon-communications-built-a-scalable-resilient-robocall-mitigation-platform/

Ribbon Communications provides communications software, and IP and optical networking end-to-end solutions that deliver innovation, unparalleled scale, performance, and agility to service providers and enterprise.

Ribbon Communications is helping customers modernize their networks. In today’s data-hungry, 24/7 world, this equates to improved competitive positioning and business outcomes. Companies are migrating from on-premises equipment for telephony services and looking for equivalent as a service (aaS) offerings. But these solutions must still meet the stringent resiliency, availability, performance, and regulatory requirements of a telephony service.

The telephony world is inundated with robocalls. In the United States alone, there were an estimated 50.5 billion robocalls in 2021! In this blog post, we describe the Ribbon Identity Hub – a holistic solution for robocall mitigation. The Ribbon Identity Hub enables services that sign and verify caller identity, which is compliant to the ATIS standards under the STIR/SHAKEN framework. It also evaluates and scores calls for the probability of nuisance and fraud.

Ribbon Identity Hub is implemented in Amazon Web Services (AWS). It is a fully managed service for telephony service providers and enterprises. The solution is secure, multi-tenant, automatic scaling, and multi-Region, and enables Ribbon to offer managed services to a wide range of telephony customers. Ribbon ensures resiliency and performance with efficient use of resources in the telephony environment, where load ratios between busy and idle time can exceed 10:1.

Ribbon Identity Hub

The Ribbon Identity Hub services are separated into a data (call-transaction) plane, and a control plane.

Data plane (call-transaction)

The call-transaction processing is typically invoked on a per-call-setup basis where availability, resilience, and performance predictability are paramount. Additionally, due to high variability in load, automatic scaling is a prerequisite.

Figure 1. Data plane architecture

Figure 1. Data plane architecture

Several AWS services come together in a solution that meets all these important objectives:

  1. Amazon Elastic Container Service (ECS): The ECS services are set up for automatic scaling and span two Availability Zones. This provides the horizontal scaling capability, the self-healing capacity, and the resiliency across Availability Zones.
  2. Elastic Load Balancing – Application Load Balancer (ALB): This provides the ability to distribute incoming traffic to ECS services as the target. In addition, it also offers:
    • Seamless integration with the ECS Auto Scaling group. As the group grows, traffic is directed to the new instances only when they are ready. As traffic drops, traffic is drained from the target instances for graceful scale down.
    • Full support for canary and linear upgrades with zero downtime. Maintains full-service availability without any changes or even perception for the client devices.
  3. Amazon Simple Storage Service (S3): Transaction detail records associated with call-related requests must be securely and reliably maintained for over a year due to billing and other contractual obligations. Amazon S3 simplifies this task with high durability, lifecycle rules, and varied controls for retention.
  4. Amazon DynamoDB: Building resilient services is significantly easier when the compute processing can be stateless. Amazon DynamoDB facilitates such stateless architectures without compromise. Coupled with the availability of the Amazon DynamoDB Accelerator (DAX) caching layer, the solution can meet the extreme low latency operation requirements.
  5. AWS Key Management Service (KMS): Certain tenant configuration is highly confidential and requires elevated protection. Furthermore, the data is part of the state that must be recovered across Regions in disaster recovery scenarios. To meet the security requirements, the KMS is used for envelope encryption using per-tenant keys. Multi-Region KMS keys facilitates the secure availability of this state across Regions without the need for application-level intervention when replicating encrypted data.
  6. Amazon Route 53: For telephony services, any non-transient service failure is unacceptable. In addition to providing high degree of resiliency through Multi-AZ architecture, Identity Hub also provides Regional level high availability through its multi-Region active-active architecture. Route 53 with health checks provides for dynamic rerouting of requests within minutes to alternate Regions.

Control plane

The Identity Hub control plane is used for customer configuration, status, and monitoring. The API is REST-based. Since this is not used on a call-by-call basis, the requirements around latency and performance are less stringent, though the requirements around high resiliency and dynamic scaling still apply. In this area, ease of implementation and maintainability are key.

Figure 2. Control plane architecture

Figure 2. Control plane architecture

The following AWS services implement our control plane:

  1. Amazon API Gateway: Coupled with a custom authenticator, the API Gateway handles all the REST API credential verification and routing. Implementation of an API is transformed into implementing handlers for each resource, which is the application core of the API.
  2. AWS Lambda: All the REST API handlers are written as Lambda functions. By using the Lambda’s serverless and concurrency features, the application automatically gains self-healing and auto-scaling capabilities. There is also a significant cost advantage as billing is per millisecond of actual compute time used. This is significant for a control plane where usage is typically sparse and unpredictable.
  3. Amazon DynamoDB: A stateless architecture with Lambda and API Gateway, all persistent state must be stored in an external database. The database must match the resilience and auto-scaling characteristics of the rest of the control plane. DynamoDB easily fits the requirements here.

The customer portal, in addition to providing the user interface for control plane REST APIs, also delivers a rich set of user-customizable dashboards and reporting capability. Here again, the availability of various AWS services simplifies the implementation, and remains non-intrusive to the central call-transaction processing.

Services used here include:

  1. AWS Glue: Enables extraction and transformation of raw transaction data into a format useful for reporting and dashboarding. AWS Glue is particularly useful here as the data available is regularly expanding, and the use cases for the reporting and dashboarding increase.
  2. Amazon QuickSight: Provides all the business intelligence (BI) functionality, including the ability for Ribbon to offer separate author and reader access to their users, and implements tenant-based access separation.

Conclusion

Ribbon has successfully deployed Identity Hub to enable cloud hosted telephony services to mitigate robocalls. Telephony requirements around resiliency, performance, and capacity were not compromised. Identity Hub offers the benefits of a 24/7 fully managed service requiring no additional customer on-premises equipment.

Choosing AWS services for Identity Hub gives Ribbon the ability to scale and meet future growth. The ability to dynamically scale the service in and out also brings significant cost advantages in telephony applications where busy hour traffic is significantly higher than idle time traffic. In addition, the availability of global AWS services facilitates the deployment of services in customer-local geographic locations to meet performance requirements or local regulatory compliance.

How Experian uses Amazon SageMaker to Deliver Affordability Verification 

Post Syndicated from Haresh Nandwani original https://aws.amazon.com/blogs/architecture/how-experian-uses-amazon-sagemaker-to-deliver-affordability-verification/

Financial Service (FS) providers must identify patterns and signals in a customer’s financial behavior to provide deeper, up-to-the-minute, insight into their affordability and credit risk. FS providers use these insights to improve decision making and customer management capabilities. Machine learning (ML) models and algorithms play a significant role in automating, categorising, and deriving insights from bank transaction data.

Experian publishes Categorisation-as-a-Service (CaaS) ML models that automate analysis of bank and credit card transactions, to be deployed in Amazon SageMaker. Driven by a suite of Experian proprietary algorithms, these models categorise a customer’s bank or credit card transactions into one of over 180 different income and expenditure categories. The service turns these categorised transactions into a set of summarised insights that can help a business better understand their customer and make more informed decisions. These insights provide a detailed picture of a customer’s financial circumstances and resilience by looking at verified income, expenditure, and credit behavior.

This blog demonstrates how financial service providers can introduce affordability verification and categorisation into their digital journeys by deploying Experian CaaS ML models on SageMaker. You don’t need significant ML knowledge to start using Amazon SageMaker and Experian CaaS.

Affordability verification and data categorisation in digital journeys

Product onboarding journeys are increasingly digital. Most financial service providers expect most of these journeys to initiate and complete online. An example journey would be consumers looking to apply for credit with their existing FS provider. These journeys typically involve FS providers performing affordability verification to ensure consumers are offered products they can afford. FS providers can now use Experian CaaS ML models available via AWS Marketplace to generate real-time financial insights and affordability verification for their customers.

Figure 1 depicts a typical digital journey for consumers applying for credit.

Figure 1. Customer journey for consumers applying for credit

Figure 1. Customer journey for consumers applying for credit

  1. Data categorisation for transactional data. Existing transactional data for current consumers is typically sourced from on-premises data sources into a data lake in the cloud. It is then prepared and transformed for processing and analytics. This analysis is done based on the FS provider’s existing consent in compliance with relevant data protection laws. Additional transaction information for other accounts not held by the lender can be sourced from Open Banking and categorised separately.
  2. Store categorised transactions. Background processes run a SageMaker batch transform job using the Experian CaaS Data Categorisation model to categorise this transactional data.
  3. Consumer applies for credit. Consumers use the FS providers’ existing front-end web, mobile, or any other digital channel to apply for credit.
  4. FS provider retrieves up-to-date insights. Insights are generated in real time using the Experian CaaS insights model deployed as endpoints in SageMaker and returned to the consumer-facing digital channel.
  5. FS provider makes credit decision. The channel app consolidates these insights to decide on product eligibility and drive customer journeys.

Deploying and publishing Experian CaaS ML models to Amazon SageMaker

Figure 2 demonstrates the technical solution for the customer journey described in the preceding section.

Figure 2. Credit application – technical solution using Amazon SageMaker and Experian CaaS ML models

Figure 2. Credit application – technical solution using Amazon SageMaker and Experian CaaS ML models

  1. Financial Service providers can use AWS Data Migration Service (AWS DMS) to replicate transactional data from their on-premises systems such as their core banking systems to Amazon S3. Customers can source this transactional data into a highly available and scalable data lake solution on AWS. Refer to AWS DMS documentation for technical details on supported database sources.
  2. FS providers can use AWS Glue, a serverless data integration service, to cleanse, prepare, and transform the transactional data into formats supported by the Experian CaaS ML models.
  3. FS providers can subscribe and download CaaS ML models built for SageMaker from the AWS Marketplace.
  4. These models can be deployed to SageMaker hosting services as a SageMaker endpoint for real-time inference. Endpoints are fully managed by AWS, and can be set up to scale on demand and deployed in a Multi-AZ model for resilience. FS providers can use Amazon API Gateway and AWS Lambda to make these endpoints available to their consumer-facing applications.
  5. SageMaker also supports a batch transform mode for ML models, which in this scenario will be used to precategorise transactional data. This mode is also useful for use cases that require nearly continuous and regular analysis such as a regular anti-fraud assessment.
  6. Consumer requests for a financial product such as a credit card on an FS provider’s digital channels.
  7. These requests invoke SageMaker endpoints, which use Experian CaaS models to derive real-time insights.
  8. These insights are used to further drive the customer’s product journey. CaaS models are pre-trained and can return insights within the latency requirements of most real-time digital journeys.

Security and compliance using CaaS

AWS Marketplace models are scanned by AWS for common vulnerabilities and exposures (CVE). CVE is a list of publicly known information about security vulnerability and exposure. For details on infrastructure security applied by SageMaker, see Infrastructure Security in Amazon SageMaker.

Data security is a key concern for FS providers and sharing of data externally is challenging from a security and compliance perspective. The CaaS deployment model described here helps address these challenges as data owned by the FS provider remains within their control domain and AWS account. There is no requirement for this data to be shared with Experian. This means the customer’s personal financial information is retained by the FS provider. FS providers cannot access the model code as it is running in a locked SageMaker environment.

AWS Marketplace models such as the Experian CaaS ML models are deployed in a network isolation mode. This ensures that the models cannot make any outbound network calls, even to other AWS services such as Amazon S3. SageMaker still performs download and upload operations against Amazon S3 in isolation from the model.

Implementing upgrades to CaaS ML models

ML model upgrades can be performed in place in Amazon SageMaker as vendors release newer versions of their models in AWS Marketplace. Endpoints can be set up in a blue/green deployment pattern to ensure that upgrades do not impact consumers and be safely rolled back with no business interruptions.

Conclusion

Automated categorisation of bank transaction data is now being used by FS providers as they start to realise the benefits it can bring to their business. This is being driven in part by the advent of Open Banking. Many FS providers have increased confidence in the accuracy and performance of automated categorisation engines. Suppliers such as Experian are providing transparency around their methodologies used to categorise data, which is also encouraging adoption.

In this blog, we covered how FS providers can introduce automated categorisation of data and affordability identification capabilities into their digital journeys. This can be done quickly and without significant in-house ML skills, using Amazon SageMaker and Experian CaaS ML models. SageMaker endpoints and batch transform capabilities enable the deployment of a highly scalable, secure, and extensible ML infrastructure with minimal development and operational effort.

Experian’s CaaS is available for use via the AWS Marketplace.

Enrich datasets for descriptive analytics with AWS Glue DataBrew

Post Syndicated from Daniel Rozo original https://aws.amazon.com/blogs/big-data/enrich-datasets-for-descriptive-analytics-with-aws-glue-databrew/

Data analytics remains a constantly hot topic. More and more businesses are beginning to understand the potential their data has to allow them to serve customers more effectively and give them a competitive advantage. However, for many small to medium businesses, gaining insight from their data can be challenging because they often lack in-house data engineering skills and knowledge.

Data enrichment is another challenge. Businesses that focus on analytics using only their internal datasets miss the opportunity to gain better insights by using reliable and credible public datasets. Small to medium businesses are no exception to this shortcoming, where obstacles such as not having sufficient data diminish their ability to make well-informed decisions based on accurate analytical insights.

In this post, we demonstrate how AWS Glue DataBrew enables businesses of all sizes to get started with data analytics with no prior coding knowledge. DataBrew is a visual data preparation tool that makes it easy for data analysts and scientists to clean and normalize data in preparation for analytics or machine learning. It includes more than 350 pre-built transformations for common data preparation use cases, enabling you to get started with cleaning, preparing, and combining your datasets without writing code.

For this post, we assume the role of a fictitious small Dutch solar panel distribution and installation company named OurCompany. We demonstrate how this company can prepare, combine, and enrich an internal dataset with publicly available data from the Dutch public entity, the Centraal Bureau voor de Statistiek (CBS), or in English, Statistics Netherlands. Ultimately, OurCompany desires to know how well they’re performing compared to the official reported values by the CBS across two important key performance indicators (KPIs): the amount of solar panel installations, and total energy capacity in kilowatt (kW) per region.

Solution overview

The architecture uses DataBrew for data preparation and transformation, Amazon Simple Storage Service (Amazon S3) as the storage layer of the entire data pipeline, and the AWS Glue Data Catalog for storing the dataset’s business and technical metadata. Following the modern data architecture best practices, this solution adheres to foundational logical layers of the Lake House Architecture.

The solution includes the following steps:

  1. We set up the storage layer using Amazon S3 by creating the following folders: raw-data, transformed-data, and curated-data. We use these folders to track the different stages of our data pipeline consumption readiness.
  2. Three CSV raw data files containing unprocessed data of solar panels as well as the external datasets from the CBS are ingested into the raw-data S3 folder.
  3. This part of the architecture incorporates both processing and cataloging capabilities:
    1. We use AWS Glue crawlers to populate the initial schema definition tables for the raw dataset automatically. For the remaining two stages of the data pipeline (transformed-data and curated-data), we utilize the functionality in DataBrew to directly create schema definition tables into the Data Catalog. Each table provides an up-to-date schema definition of the datasets we store on Amazon S3.
    2. We work with DataBrew projects as the centerpiece of our data analysis and transformation efforts. In here, we set up no-code data preparation and transformation steps, and visualize them through a highly interactive, intuitive user interface. Finally, we define DataBrew jobs to apply these steps and store transformation outputs on Amazon S3.
  4. To gain the benefits of granular access control and easily visualize data from Amazon S3, we take advantage of the seamless integration between Amazon Athena and Amazon QuickSight. This provides a SQL interface to query all the information we need from the curated dataset stored on Amazon S3 without the need to create and maintain manifest files.
  5. Finally, we construct an interactive dashboard with QuickSight to depict the final curated dataset alongside our two critical KPIs.

Prerequisites

Before beginning this tutorial, make sure you have the required Identity and Access Management (IAM) permissions to create the resources required as part of the solution. Your AWS account should also have an active subscription to QuickSight to create the visualization on processed data. If you don’t have a QuickSight account, you can sign up for an account.

The following sections provide a step-by-step guide to create and deploy the entire data pipeline for OurCompany without the use of code.

Data preparation steps

We work with the following files:

  • CBS Dutch municipalities and provinces (Gemeentelijke indeling op 1 januari 2021) – Holds all the municipalities and provinces names and codes of the Netherlands. Download the file gemeenten alfabetisch 2021. Open the file and save it as cbs_regions_nl.csv. Remember to change the format to CSV (comma-delimited).
  • CBS Solar power dataset (Zonnestroom; vermogen bedrijven en woningen, regio, 2012-2018) – This file contains the installed capacity in kilowatts and total number of installations for businesses and private homes across the Netherlands from 2012–2018. To download the file, go to the dataset page, choose the Onbewerkte dataset, and download the CSV file. Rename the file to cbs_sp_cap_nl.csv.
  • OurCompany’s solar panel historical data – Contains the reported energy capacity from all solar panel installations of OurCompany across the Netherlands from 2012 until 2018. Download the file.

As a result, the following are the expected input files we use to work with the data analytics pipeline:

  • cbs_regions_nl.csv
  • cbs_sp_cap_nl.csv
  • sp_data.csv

Set up the storage Layer

We first need to create the storage layer for our solution to store all raw, transformed, and curated datasets. We use Amazon S3 as the storage layer of our entire data pipeline.

  1. Create an S3 bucket in the AWS Region where you want to build this solution. In our case, the bucket is named cbs-solar-panel-data. You can use the same name followed by a unique identifier.
  2. Create the following three prefixes (folders) in your S3 bucket by choosing Create folder:
    1. curated-data/
    2. raw-data/
    3. transformed-data/

  3. Upload the three raw files to the raw-data/ prefix.
  4. Create two prefixes within the transformed-data/ prefix named cbs_data/ and sp_data/.

Create a Data Catalog database

After we set up the storage layer of our data pipeline, we need to create the Data Catalog to store all the metadata of the datasets hosted in Amazon S3. To do so, follow these steps:

  1. Open the AWS Glue console in the same Region of your newly created S3 bucket.
  2. In the navigation pane, choose Databases.
  3. Choose Add database.
  4. Enter the name for the Data Catalog to store all the dataset’s metadata.
  5. Name the database sp_catalog_db.

Create AWS Glue data crawlers

Now that we created the catalog database, it’s time to crawl the raw data prefix to automatically retrieve the metadata associated to each input file.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Add a crawler with the name crawler_raw and choose Next.
  3. For S3 path, select the raw-data folder of the cbs-solar-panel-data prefix.
  4. Create an IAM role and name it AWSGlueServiceRole-cbsdata.
  5. Leave the frequency as Run on demand.
  6. Choose the sp_catalog_db database created in the previous section, and enter the prefix raw_ to identify the tables that belong to the raw data folder.
  7. Review the parameters of the crawler and then choose Finish.
  8. After the crawler is created, select it and choose Run crawler.

After successful deployment of the crawler, your three tables are created in the sp_catalog_db database: raw_sp_data_csv, raw_cbs_regions_nl_csv, and raw_cbs_sp_cap_nl_csv.

Create DataBrew raw datasets

To utilize the power of DataBrew, we need to connect datasets that point to the Data Catalog S3 tables we just created. Follow these steps to connect the datasets:

  1. On the DataBrew console, choose Datasets in the navigation pane.
  2. Choose Connect new dataset.
  3. Name the dataset cbs-sp-cap-nl-dataset.
  4. For Connect to new dataset, choose Data Catalog S3 tables.
  5. Select the sp_catalog_db database and the raw_cbs_sp_cap_nl_csv table.
  6. Choose Create dataset.

We need to create to two more datasets following the same process. The following table summarizes the names and tables of the catalog required for the new datasets.

Dataset name Data catalog table
sp-dataset raw_sp_data_csv
cbs-regions-nl-dataset raw_cbs_regions_nl_csv

Import DataBrew recipes

A recipe is a set of data transformation steps. These transformations are applied to one or multiple datasets of your DataBrew project. For more information about recipes, see Creating and using AWS Glue DataBrew recipes.

We have prepared three DataBrew recipes, which contain the set of data transformation steps we need for this data pipeline. Some of these transformation steps include: renaming columns (from Dutch to English), removing null or missing values, aggregating rows based on specific attributes, and combining datasets in the transformation stage.

To import the recipes, follow these instructions:

  1. On the DataBrew console, choose Recipes in the navigation pane.
  2. Choose Upload recipe.
  3. Enter the name of the recipe: recipe-1-transform-cbs-data.
  4. Upload the following JSON recipe.
  5. Choose Create recipe.

Now we need to upload two more recipes that we use for transformation and aggregation projects in DataBrew.

  1. Follow the same procedure to import the following recipes:
Recipe name Recipe source file
recipe-2-transform-sp-data Download
recipe-3-curate-sp-cbs-data Download
  1. Make sure the recipes are listed in the Recipes section filtered by All recipes.

Set up DataBrew projects and jobs

After we successfully create the Data Catalog database, crawlers, DataBrew datasets, and import the DataBrew recipes, we need to create the first transformation project.

CBS external data transformation project

The first project takes care of transforming, cleaning, and preparing cbs-sp-cap-nl-dataset. To create the project, follow these steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Create a new project with the name 1-transform-cbs-data.
  3. In the Recipe details section, choose Edit existing recipe and choose the recipe recipe-1-transform-cbs-data.
  4. Select the newly created cbs-sp-cap-nl-dataset under Select a dataset.
  5. In the Permissions section, choose Create a new IAM role.
  6. As suffix, enter sp-project.
  7. Choose Create project.

After you create the project, a preview dataset is displayed as a result of applying the selected recipe. When you choose 10 more recipe steps, the service shows the entire set of transformation steps.

After you create the project, you need to grant put and delete S3 object permissions to the created role AWSGlueDataBrewServiceRole-sp-project on IAM. Add an inline policy using the following JSON and replace the resource with your S3 bucket name:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": "arn:aws:s3:::<your-S3-bucket-name>/*"
        }
    ]
}

This role also needs permissions to access the Data Catalog. To grant these permissions, add the managed policy AWSGlueServiceRole to the role.

CBS external data transformation job

After we define the project, we need to configure and run a job to apply the transformation across the entire raw dataset stored in the Raw-data folder of your S3 bucket. To do so, you need to do the following:

  1. On the DataBrew project page, choose Create job.
  2. For Job name, enter 1-transform-cbs-data-job.
  3. For Output to, choose Data Catalog S3 tables.
  4. For File type¸ choose Parquet.
  5. For Database name, choose sp_catalog_db.
  6. For Table name, choose Create new table.
  7. For Catalog table name, enter transformed_cbs_data.
  8. For S3 location, enter s3://<your-S3-bucket-name>/transformed-data/cbs_data/.
  9. In the job output settings section, choose Settings.
  10. Select Replace output files for each job run and then choose Save.
  11. In the permissions section, choose the automatically created role with the sp-project suffix; for example, AWSGlueDataBrewServiceRole-sp-project.
  12. Review the job details once more and then choose Create and run job.
  13. Back in the main project view, choose Job details.

After a few minutes, the job status changes from Running to Successful. Choose the output to go to the S3 location where all the generated Parquet files are stored.

Solar panels data transformation stage

We now create the second phase of the data pipeline. We create a project and a job using the same procedure described in the previous section.

  1. Create a DataBrew project with the following parameters:
    1. Project name2-transform-sp-data
    2. Imported reciperecipe-2-transform-sp-data
    3. Datasetsp_dataset
    4. Permissions roleAWSGlueDataBrewServiceRole-sp-project
  2. Create and run another DataBrew job with the following parameters:
    1. Job name2-transform-sp-data-job
    2. Output to – Data Catalog S3 tables
    3. File type – Parquet
    4. Database namesp_catalog_db
    5. Create new table with table nametransformed_sp_data
    6. S3 locations3://<your-S3-bucket-name>/transformed-data/sp_data/
    7. Settings – Replace output files for each job run.
    8. Permissions roleAWSGlueDataBrewServiceRole-sp-project
  3. After the job is complete, create the DataBrew datasets with the following parameters:
Dataset name Data catalog table
transformed-cbs-dataset awsgluedatabrew_transformed_cbs_data
transformed-sp-dataset awsgluedatabrew_transformed_sp_data

You should now see five items as part of your DataBrew dataset.

Data curation and aggregation stage

We now create the final DataBrew project and job.

  1. Create a DataBrew project with the following parameters:
    1. Project name3-curate-sp-cbs-data
    2. Imported reciperecipe-3-curate-sp-cbs-data
    3. Datasettransformed_sp_dataset
    4. Permissions roleAWSGlueDataBrewServiceRole-sp-project
  2. Create a DataBrew job with the following parameters:
    1. Job name3-curate-sp-cbs-data-job
    2. Output to – Data Catalog S3 tables
    3. File type – Parquet
    4. Database namesp_catalog_db
    5. Create new table with table namecurated_data
    6. S3 locations3://<your-S3-bucket-name>/curated-data/
    7. Settings – Replace output files for each job run
    8. Permissions roleAWSGlueDataBrewServiceRole-sp-project

The last project defines a single transformation step; the join between the transformed-cbs-dataset and the transformed-sp-dataset based on the municipality code and the year.

The DataBrew job should take a few minutes to complete.

Next, check your sp_catalog_db database. You should now have raw, transformed, and curated tables in your database. DataBrew automatically adds the prefix awsgluedatabrew_ to both the transformed and curated tables in the catalog.

Consume curated datasets for descriptive analytics

We’re now ready to build the consumption layer for descriptive analytics with QuickSight. In this section, we build a business intelligence dashboard that reflects OurCompany’s solar panel energy capacity and installations participation in contrast to the reported values by the CBS from 2012–2018.

To complete this section, you need to have the default primary workgroup already set up on Athena in the same Region where you implemented the data pipeline. If it’s your first time setting up workgroups on Athena, follow the instructions in Setting up Workgroups.

Also make sure that QuickSight has the right permissions to access Athena and your S3 bucket. Then complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose Create a new dataset.
  3. Select Athena as the data source.
  4. For Data source name, enter sp_data_source.
  5. Choose Create data source.
  6. Choose AWSDataCatalog as the catalog and sp_catalog_db as the database.
  7. Select the table curated_data.
  8. Choose Select.
  9. In the Finish dataset creation section, choose Directly query your data and choose Visualize.
  10. Choose the clustered bar combo chart from the Visual types list.
  11. Expand the field wells section and then drag and drop the following fields into each section as shown in the following screenshot.
  12. Rename the visualization as you like, and optionally filter the report by sp_year using the Filter option.

From this graph, we can already benchmark OurCompany against the regional values reported by the CBS across two dimensions: the total amount of installations and the total kW capacity generated by solar panels.

We went one step further and created two KPI visualizations to empower our descriptive analytics capabilities. The following is our final dashboard that we can use to enhance our decision-making process.

Clean up resources

To clean all the resources we created for the data pipeline, complete the following steps:

  1. Remove the QuickSight analyses you created.
  2. Delete the dataset curated_data.
  3. Delete all the DataBrew projects with their associated recipes.
  4. Delete all the DataBrew datasets.
  5. Delete all the AWS Glue crawlers you created.
  6. Delete the sp_catalog_db catalog database; this removes all the tables.
  7. Empty the contents of your S3 bucket and delete it.

Conclusion

In this post, we demonstrated how you can begin your data analytics journey. With DataBrew, you can prepare and combine the data you already have with publicly available datasets such as those from the Dutch CBS (Centraal Bureau voor de Statistiek) without needing to write a single line of code. Start using DataBrew today and enrich key datasets in AWS for enhanced descriptive analytics capabilities.


About the Authors

Daniel Rozo is a Solutions Architect with Amazon Web Services based out of Amsterdam, The Netherlands. He is devoted to working with customers and engineering simple data and analytics solutions on AWS. In his free time, he enjoys playing tennis and taking tours around the beautiful Dutch canals.

Maurits de Groot is an intern Solutions Architect at Amazon Web Services. He does research on startups with a focus on FinTech. Besides working, Maurits enjoys skiing and playing squash.


Terms of use: Gemeentelijke indeling op 1 januari 2021, Zonnestroom; vermogen bedrijven en woningen, regio (indeling 2018), 2012-2018, and copies of these datasets redistributed by AWS, are licensed under the Creative Commons 4.0 license (CC BY 4.0), sourced from Centraal Bureau voor de Statistiek (CBS). The datasets used in this solution are modified to rename columns from Dutch to English, remove null or missing values, aggregate rows based on specific attributes, and combine the datasets in the final transformation. Refer to the CC BY 4.0 use, adaptation, and attribution requirements for additional information.

Query cross-account AWS Glue Data Catalogs using Amazon Athena

Post Syndicated from Louis Hourcade original https://aws.amazon.com/blogs/big-data/query-cross-account-aws-glue-data-catalogs-using-amazon-athena/

Many AWS customers rely on a multi-account strategy to scale their organization and better manage their data lake across different projects or lines of business. The AWS Glue Data Catalog contains references to data used as sources and targets of your extract, transform, and load (ETL) jobs in AWS Glue. Using a centralized Data Catalog offers organizations a unified metadata repository and minimizes the administrative overhead related to sharing data across different accounts, thereby expanding access to the data lake.

Amazon Athena is one of the popular choices to run analytical queries in data lakes. This interactive query service makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you’re charged based on the amount of data scanned by your queries.

In May 2021, Athena introduced the ability to query Data Catalogs across multiple AWS accounts, enabling you to access your data lake without the complexity of replicating catalog metadata in individual AWS accounts. This blog post details the procedure for using the feature.

Solution overview

The following diagram shows the necessary components used in two different accounts (consumer account and producer account, hosting a central Data Catalog) and the flow between the two for cross-account Data Catalog access using Athena.

Our use case showcases Data Catalog sharing between two accounts:

  • Producer account – The account that administrates the central Data Catalog
  • Consumer account – The account querying data from the producer’s Data Catalog (the central Data Catalog)

In this walkthrough, we use the following two tables, extracted from an ecommerce dataset:

  • The orders table logs the website’s orders and contains the following key attributes:
    • Row ID­ – Unique entry identifier in the orders table
    • Order ID – Unique order identifier
    • Order date – Date the order was placed
    • Profit – Profit value of the order
  • The returns table logs the returned items and contains the following attributes:
    • Returned – If the order has been returned (Yes/No)
    • Order ID – Unique order identifier
    • Market – Region market

We walk you through the following high-level steps to use this solution:

  1. Set up the producer account.
  2. Set up the consumer account.
  3. Set up permissions.
  4. Register the producer account in the Data Catalog.
  5. Query your data.

You use Athena in the consumer account to perform different operations using the producer account’s Data Catalog.

First, you use the consumer account to query the orders table in the producer account’s Data Catalog.

Next, you use the consumer account to join the two tables and retrieve information about lost profit from returned items. The returns table is in the consumer’s Data Catalog, and the orders table is in the producer’s.

Prerequisites

The following are the prerequisites for this walkthrough:

This lists all your Athena workgroups. Make sure that the one you use runs on Athena engine version 2.

If all your workgroups are using Athena engine version 1, you need to update the engine version of an existing workgroup or create a new workgroup with the appropriate version.

Set up the producer account

In the producer account, complete the following steps:

  1. Create an S3 bucket for your producer’s data. For information about how to secure your S3 bucket, see Security Best Practices for Amazon S3.
  2. In this bucket, create a prefix named orders.
  3. Download the orders table in CSV format and upload it to the orders prefix.
  4. Run the following Athena query to create the producer’s database:
CREATE DATABASE producer_database
  COMMENT 'Producer data'
  1. Run the following Athena query to create the orders table in the producer’s database. Make sure to replace <your-producer-s3-bucket-name> with the name of the bucket you created.
CREATE EXTERNAL TABLE producer_database.orders(
  `row id` bigint, 
  `order id` string, 
  `order date` string, 
  `ship date` string, 
  `ship mode` string, 
  `customer id` string, 
  `customer name` string, 
  `segment` string, 
  `city` string, 
  `state` string, 
  `country` string, 
  `postal code` bigint, 
  `market` string, 
  `region` string, 
  `product id` string, 
  `category` string, 
  `sub-category` string, 
  `product name` string, 
  `sales` string, 
  `quantity` bigint, 
  `discount` string, 
  `profit` string, 
  `shipping cost` string, 
  `order priority` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\;'
LOCATION
  's3://<your-producer-s3-bucket-name>/orders/'
TBLPROPERTIES (
  'skip.header.line.count'='1'
)

Set up the consumer account

In the consumer account, complete the following steps:

  1. Create an S3 bucket for your consumer’s data.
  2. In this bucket, create a prefix named returns.
  3. Download the returns table in CSV format and upload it to the returns prefix.
  4. Run the following Athena query to create the consumer’s database:
CREATE DATABASE consumer_database
COMMENT 'Consumer data'
  1. Run the following Athena query to create the returns table in the consumer’s database. Make sure to replace <your-consumer-s3-bucket-name> with the name of the bucket you created.
CREATE EXTERNAL TABLE consumer_database.returns(
  `returned` string, 
  `order id` string, 
  `market` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\;' 
LOCATION
  's3://<your-consumer-s3-bucket-name>/returns/'
TBLPROPERTIES (
  'skip.header.line.count'='1'
)

Set up permissions

For the consumer account to query data in the producer account, we need to set up permissions.

First, we give the consumer account permission to access the producer account’s AWS Glue resources.

  1. In the producer account’s Data Catalog settings, add the following AWS Glue resource policy, which grants the consumer account access to the Data Catalog:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<Consumer-account-id>:role/<role-in-consumer-account>"
            },
            "Action": [
        "glue:GetDatabases",
        "glue:GetTables"
      ],
            "Resource": [
                "arn:aws:glue:<Region>:<Producer-account-id>:catalog",
                "arn:aws:glue:<Region>:<Producer-account-id>:database/producer-database",
                "arn:aws:glue:<Region>:<Producer-account-id>:table/producer-database/orders"
            ]
        }
    ]
}

Next, we give the consumer account permission to list and get data from the S3 bucket in the producer account.

  1. In the producer account, add the following S3 bucket policy to the bucket <Producer-bucket>, which stores the data:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::<Consumer-account-id>:role/<role-in-consumer-account>"
            },
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<Producer-bucket>",
                "arn:aws:s3:::<Producer-bucket>/orders/*"
            ]
        }
    ]
}

Register the producer account’s Data Catalog

At this stage, you have set up the required permissions to access the central Data Catalog in the producer account from the consumer account. You now need to register the central Data Catalog as a data source in Athena.

  1. In the consumer account, go the Athena console and choose Connect data source.
  2. Select S3 – AWS Glue Data Catalog as the data source selection.
  3. Select AWS Glue Data Catalog in another account.

You then need to provide some information regarding the central Data Catalog you want to register.

  1. For Data source name, enter a name for the catalog (for example, Central_Data_Catalog). This serves as an alias in the consumer account, pointing to the central Data Catalog in the producer account.
  1. For Catalog ID, enter the producer account ID.
  2. Choose Register to complete the process.

Query your data

You have now registered the central Data Catalog as a data source in the consumer account. In the Athena query editor, you can then choose Central_Data_Catalog as a data source. Under Database, you can see all the databases for which you were granted access in the producer account’s AWS Glue resource policy. The same applies for the tables. After completing the steps in the earlier sections, you should see the orders table from producer_database located in the producer account.

You can start querying the Data Catalog of the producer account directly from Athena in the consumer account. You can test this by running the following SQL query in Athena:

SELECT * FROM "Central_Data_Catalog"."producer_database"."orders" limit 10;

This SQL query extracts the first 10 rows of the orders table located in the producer account.

You just queried a Data Catalog located in another AWS account, which enables you to easily access your central Data Catalog and scale your data lake strategy.

Now, let’s see how we can join two tables that are in different AWS accounts. In our scenario, the returns table is in the consumer account and the orders table is in the producer account. Suppose you want to join the two tables and see the total amount of items returned in each market. The Athena built-in support for cross-account Data Catalogs makes this operation easy. In the Athena query editor, run the following SQL query:

SELECT
returns_tb.Market as Market,
sum(orders_tb.quantity) as Total_Quantity
FROM "Central_Data_Catalog"."producer_database"."orders" as orders_tb
JOIN "AwsDataCatalog"."consumer_database"."returns" as returns_tb
ON orders_tb."order id" = returns_tb."order id"
GROUP BY returns_tb.Market;

In this SQL query, you use both the consumer’s Data Catalog AwsDataCatalog and the producer’s Data Catalog Central_Data_Catalog to join tables and get insights from your data.

Limitations and considerations

The following are some limitations that you should take into consideration before using Athena built-in support for cross-account Data Catalogs:

  • This Athena feature is available only in Regions where Athena engine version 2 is supported. For a list of Regions that support Athena engine version 2, see Athena engine version 2. To upgrade a workgroup to engine version 2, see Changing Athena Engine Versions.
  • As of this writing, CREATE VIEW statements that include a cross-account Data Catalog are not supported.
  • Cross-Region Data Catalog queries are not supported.

Clean up

After you query and analyze the data, you should clean up the resources used in this tutorial to prevent any recurring AWS costs.

To clean up the resources, navigate to the Amazon S3 console in both the provider and consumer accounts, and empty the S3 buckets. Also, navigate to the AWS Glue console and delete the databases.

Conclusion

In this post, you learned how to query data from multiple accounts using Athena, which allows your organization to access to a centralized Data Catalog. We hope that this post helps you build and explore your data lake across multiple accounts.

To learn more about AWS tools to manage access to your data, check out AWS Lake Formation. This service facilitates setting up a centralized data lake and allows you to grant users and ETL jobs cross-account access to Data Catalog metadata and underlying data.


About the Authors

Louis Hourcade is a Data Scientist in the AWS Professional Services team. He works with AWS customer across various industries to accelerate their business outcomes with innovative technologies. In his spare time he enjoys running, climbing big rocks, and surfing (not so big) waves.

Sara Kazdagli is a Professional Services consultant specialized in data analytics and machine learning. She helps customers across different industries build innovative solutions and make data-driven decisions. Sara holds a MSc in Software engineering and a MSc in data science. In her spare time, she likes to go on hikes and walks with her australian shepherd dog Kiba.

Jahed Zaïdi is an AI/ML & Big Data specialist at AWS Professional Services. He is a builder and a trusted advisor to companies across industries, helping them innovate faster and on a larger scale. As a lifelong explorer, Jahed enjoys discovering new places, cultures, and outdoor activities.

Ibotta builds a self-service data lake with AWS Glue

Post Syndicated from Erik Franco original https://aws.amazon.com/blogs/big-data/ibotta-builds-a-self-service-data-lake-with-aws-glue/

This is a guest post co-written by Erik Franco at Ibotta.

Ibotta is a free cash back rewards and payments app that gives consumers real cash for everyday purchases when they shop and pay through the app. Ibotta provides thousands of ways for consumers to earn cash on their purchases by partnering with more than 1,500 brands and retailers.

At Ibotta, we process terabytes of data every day. Our vision is to allow for these datasets to be easily used by data scientists, decision-makers, machine learning engineers, and business intelligence analysts to provide business insights and continually improve the consumer and saver experience. This strategy of data democratization has proven to be a key pillar in the explosive growth Ibotta has experienced in recent years.

This growth has also led us to rethink and rebuild our internal technology stacks. For example, as our datasets began to double in size every year combined with complex, nested JSON data structures, it became apparent that our data warehouse was no longer meeting the needs of our analytics teams. To solve this, Ibotta adopted a data lake solution. The data lake proved to be a huge success because it was a scalable, cost-effective solution that continued to fulfill the mission of data democratization.

The rapid growth that was the impetus for the transition to a data lake has now also forced upstream engineers to transition away from the monolith architecture to a microservice architecture. We now use event-driven microservices to build fault-tolerant and scalable systems that can react to events as they occur. For example, we have a microservice in charge of payments. Whenever a payment occurs, the service emits a PaymentCompleted event. Other services may listen to these PaymentCompleted events to trigger other actions, such as sending a thank you email.

In this post, we share how Ibotta built a self-service data lake using AWS Glue. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.

Challenge: Fitting flexible, semi-structured schemas into relational schemas

The move to an event-driven architecture, while highly valuable, presented several challenges. Our analytics teams use these events for use cases where low-latency access to real-time data is expected, such as fraud detection. These real-time systems have fostered a new area of growth for Ibotta and complement well with our existing batch-based data lake architecture. However, this change presented two challenges:

  • Our events are semi-structured and deeply nested JSON objects that don’t translate well to relational schemas. Events are also flexible in nature. This flexibility allows our upstream engineering teams to make changes as needed and thereby allows Ibotta to move quickly in order to capitalize on market opportunities. Unfortunately, this flexibility makes it very difficult to keep schemas up to date.
  • Adding to these challenges, in the last 3 years, our analytics and platform engineering teams have doubled in size. Our data processing team, however, has stayed the same size largely due to difficulty in hiring qualified data engineers who possess specialized skills in developing scalable pipelines and industry demand. This meant that our data processing team couldn’t keep up with the requests from our analytics teams to onboard new data sources.

Solution: A self-service data lake

To solve these issues, we decided that it wasn’t enough for the data lake to provide self-service data consumption features. We also needed self-service data pipelines. These would provide both the platform engineering and analytics teams with a path to make their data available within the data lake and with minimal to no data engineering intervention necessary. The following diagram illustrates our self-service data ingestion pipeline.

The pipeline includes the following components:

  1. Ibotta data stakeholders – Our internal data stakeholders wanted the capability to automatically onboard datasets. This user base includes platform engineers, data scientists, and business analysts.
  2. Configuration file – Our data stakeholders update a YAML file with specific details on what dataset they need to onboard. Sources for these datasets include our enterprise microservices.
  3. Ibotta enterprise microservices – Microservices make up the bulk of our Ibotta platform. Many of these microservices utilize events to asynchronously communicate important information. These events are also valuable for deriving analytics insights.
  4. Amazon Kinesis – After the configuration file is updated, data is immediately streamed to Amazon Kinesis. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. Streaming the data through Kinesis Data Streams and Kinesis Data Firehose gives us the flexibility to analyze the data in real time while also allowing us to store the data in Amazon Simple Storage Service (Amazon S3).
  5. Ibotta self-service data pipeline – This is the starting point of our data processing. We use Apache Airflow to orchestrate our pipelines once every hour.
  6. Amazon S3 raw data – Our data lands in Amazon S3 without any transformation. The complex nature of the JSON is retained for future processing or validation.
  7. AWS Glue – Our goal now is to take the complex nested JSON and create a simpler structure. AWS Glue provides a set of built-in transforms that we use to process this data. One of the transforms is Relationalize—an AWS Glue transform that takes semi-structured data and transforms it into a format that can be more easily analyzed by engines like Presto. This feature means that our analytics teams can continue to use the analytics engines they’re comfortable with and thereby lessen the impact of transitioning from relational data sources to semi-structured event data sources. The Relationalize function can flatten nested structures and create multiple dynamic frames. We use 80 lines of code to convert any JSON-based microservice message to a consumable table. We have provided this code base here as a reference and not for reuse.
    // Convert to a DynamicFrame and relationalize
       // Convert it back to DataFrame
       val dynamicFrame: DynamicFrame = DynamicFrame(df, glueContext)
       val dynamicFrameCollection: Seq[DynamicFrame] = dynamicFrame.relationalize(rootTableName = glueSourceTable,
         stagingPath = glueTempStorage,
         options = JsonOptions.empty)
       val relationalizedDF: Dataset[Row] = removeColumnDotNotationRelationalize(dynamicFrameCollection(0).toDF())
       // Get rid of dot-notation and repartition it
       val repartitionedDF: Dataset[Row] = relationalizedDF.repartition(finalRepartitionValue.toInt)
       // Write it out
       repartitionedDF
         .write
         .mode("overwrite")
         .option("compression", "snappy")
         .parquet(glueRelationalizeOutputS3Path)

  8. Amazon S3 curated – We then store the relationalized structures as Parquet format in Amazon S3.
  9. AWS Glue crawler AWS Glue crawlers allow us to automatically discover schema and catalogs in the AWS Glue Data Catalog. This feature is a core component of our self-service data pipelines because it removes the requirement of having a data engineer manually create or update the schemas. Previously, if a change needed to occur, it flowed through a communication path that included platform engineers, data engineers, and analytics. AWS Glue crawlers effectively remove the data engineers from this communication path. This means new datasets or changes to datasets are made available quickly within the data lake. It also frees up our data engineers to continue working on improvements to our self-service data pipelines and other data paved roadmap features.
  10. AWS Glue Data Catalog – A common problem in growing data lakes is that the datasets can become harder and harder to work with. A common reason for this is a lack of discoverability of data within the data lake as well as a lack of clear understanding of what the datasets are conveying. The AWS Glue Catalog is a feature that works in conjunction with AWS Glue crawlers to provide data lake users with searchable metadata for different data lake datasets. As AWS Glue crawlers discover new datasets or updates, they’re recorded into the Data Catalog. You can then add descriptions at the table or fields levels for these datasets. This cuts down on the level of tribal knowledge that exists between various data lake consumers and makes it easy for these users to self-serve from the data lake.
  11. End-user data consumption – The end-users are the same as our internal stakeholders called out in Step 1.

Benefits

The AWS Glue capabilities we described make it a core component of building our self-service data pipelines. When we initially adopted AWS Glue, we saw a three-fold decrease in our OPEX costs as compared to our previous data pipelines. This was further enhanced when AWS Glue moved to per-second billing. To date, AWS Glue has allowed us to realize a five-fold decrease in OPEX costs. Also, AWS Glue requires little to no manual intervention to ingest and process our over 200 complex JSON objects. This allows Ibotta to utilize AWS Glue each day as a key component in providing actionable data to the organization’s growing analytics and platform engineering teams.

We took away the following learnings in building self-service data platforms:

Conclusion and next steps

With the self-service data lake we have established, our business teams are realizing the benefits of speed and agility. As next steps, we’re going to improve our self-service pipeline with the following features:

  • AWS Glue streaming – Use AWS Glue streaming for real-time relationalization. With AWS Glue streaming, we can simplify our self-service pipelines by potentially getting rid of our orchestration layer while also getting data into the data lake sooner.
  • Support for ACID transactions – Implement data formats in the data lake that allow for ACID transactions. A benefit of this ACID layer is the ability to merge streaming data into data lake datasets.
  • Simplify data transport layers – Unify the data transport layers between the upstream platform engineering domains and the data domain. From the time we first implemented an event-driven architecture at Ibotta to today, AWS has offered new services such as Amazon EventBridge and Amazon Managed Streaming for Apache Kafka (Amazon MSK) that have the potential to simplify certain facets of our self-service and data pipelines.

We hope that this blog post will inspire your organization to build a self-service data lake using serverless technologies to accelerate your business goals.


About the Authors

Erik Franco is a Data Architect at Ibotta and is leading Ibotta’s implementation of its next-generation data platform. Erik enjoys fishing and is an avid hiker. You can often find him hiking one of the many trails in Colorado with his lovely wife Marlene and wonderful dog Sammy.

Shiv Narayanan is Global Business Development Manager for Data Lakes and Analytics solutions at AWS. He works with AWS customers across the globe to strategize, build, develop and deploy modern data platforms. Shiv loves music, travel, food and trying out new tech.

Matt Williams is a Senior Technical Account Manager for AWS Enterprise Support. He is passionate about guiding customers on their cloud journey and building innovative solutions for complex problems. In his spare time, Matt enjoys experimenting with technology, all things outdoors, and visiting new places.

Introducing PII data identification and handling using AWS Glue DataBrew

Post Syndicated from Harsh Vardhan Singh Gaur original https://aws.amazon.com/blogs/big-data/introducing-pii-data-identification-and-handling-using-aws-glue-databrew/

AWS Glue DataBrew, a visual data preparation tool, can now identify and handle sensitive data by applying advance transformations like redaction, replacement, encryption, and decryption on your personally identifiable information (PII) data. With exponential growth of data, companies are handling huge volumes and a wide variety of data coming into their platform, including PII data. Identifying and protecting sensitive data at scale has become increasingly complex, expensive, and time-consuming. Organizations have to adhere to data privacy, compliance, and regulatory needs such as GDPR and CCPA. They need to identify sensitive data, including PII such as name, SSN, address, email, driver’s license, and more. Even after identification, it’s cumbersome to implement redaction, masking, or encryption of sensitive personal information at scale.

To enable data privacy and protection, DataBrew has launched PII statistics, which identifies PII columns and provide their data statistics when you run a profile job on your dataset. Furthermore, DataBrew has introduced PII data handling transformations, which enable you to apply data masking, encryption, decryption, and other operations on your sensitive data.

In this post, we walk through a solution in which we run a data profile job to identify and suggest potential PII columns present in a dataset. Next, we target PII columns in a DataBrew project and apply various transformations to handle the sensitive columns existing in the dataset. Finally, we run a DataBrew job to apply the transformations on the entire dataset and store the processed, masked, and encrypted data securely in Amazon Simple Storage Service (Amazon S3).

Solution overview

We use a public dataset that is available for download at Synthetic Patient Records with COVID-19. The data hosted within SyntheticMass has been generated by SyntheaTM, an open-source patient population simulation made available by The MITRE Corporation.

Download the zipped file 10k_synthea_covid19_csv.zip for this solution and unzip it locally. The solution uses the dummy data in the file patient.csv to demonstrate data redaction and encryption capability. The file contains 10,000 synthetic patient records in CSV format, including PII columns like driver’s license, birth date, address, SSN, and more.

The following diagram illustrates the architecture for our solution.

The steps in this solution are as follows:

  1. The sensitive data is stored in an S3 bucket. You create a DataBrew dataset by connecting to the data in Amazon S3.
  2. Run a DataBrew profile job to identify the PII columns present in the dataset by enabling PII statistics.
  3. After identification of PII columns, apply transformations to redact or encrypt column values as a part of your recipe.
  4. A DataBrew job runs the recipe steps on the entire data and generates output files with sensitive data redacted or encrypted.
  5. After the output data is written to Amazon S3, we create an external table on top in Amazon Athena. Data consumers can use Athena to query the processed and cleaned data.

Prerequisites

For this walkthrough, you need an AWS account. Use us-east-1 as your AWS Region to implement this solution.

Set up your source data in Amazon S3

Create an S3 bucket called databrew-clean-pii-data-<Your-Account-ID> in us-east-1 with the following prefixes:

  • sensitive_data_input
  • cleaned_data_output
  • profile_job_output

Upload the patient.csv file to the sensitive_data_input prefix.

Create a DataBrew dataset

To create a DataBrew dataset, complete the following steps:

  1. On the DataBrew console, in the navigation pane, choose Datasets.
  2. Choose Connect new dataset.
  3. For Dataset name, enter a name (for this post, Patients).
  4. Under Connect to new dataset, select Amazon S3 as your source.
  5. For Enter your source from S3, enter the S3 path to the patient.csv file. In our case, this is s3://databrew-clean-pii-data-<Account-ID>/ sensitive_data_input/patients.csv.
  6. Scroll to the bottom of the page and choose Create dataset.

Run a data profile job

You’re now ready to create your profile job.

  1. In the navigation pane, choose Datasets.
  2. Select the Patients dataset.
  3. Choose Run data profile and choose Create profile job.
  4. Name the job Patients - Data Profile Job.
  5. We run the data profile on the entire dataset, so for Data sample, select Full dataset.
  6. In the Job output settings section, point to the profile_job_output S3 prefix where the data profile output is stored when the job is complete.
  7. Expand Data profile configurations, and select Enable PII statistics to identify PII columns when running the data profile job.

This option is disabled by default; you must enable it manually before running the data profile job.

  1. For PII categories, select All categories.
  2. Keep the remaining settings at their default.
  3. In the Permissions section, create a new AWS Identity and Access Management (IAM) role that is used by the DataBrew job to run the profile job, and use PII-DataBrew-Role as the role suffix.
  4. Choose Create and run job.

The job runs on the sample data and takes a few minutes to complete.

Now that we’ve run our profile job, we can review data profile insights about our dataset by choosing View data profile. We can also review the results of the profile through the visualizations on the DataBrew console and view the PII widget. This section provides a list of identified PII columns mapped to PII categories with column statistics. Furthermore, it suggests potential PII data that you can review.

Create a DataBrew project

After we identify the PII columns in our dataset, we can focus on handling the sensitive data in our dataset. In this solution, we perform redaction and encryption in our DataBrew project using the Sensitive category of transformations.

To create a DataBrew project for handling our sensitive data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create project.
  3. For Project name, enter a name (for this post, patients-pii-handling).
  4. For Select a dataset, select My datasets.
  5. Select the Patients dataset.
  6. Under Permissions, for Role name, choose the IAM role that we created previously for our DataBrew profile job AWSGlueDataBrewServiceRole-PII-DataBrew-Role.
  7. Choose Create project.

The dataset takes few minutes to load. When the dataset is loaded, we can start performing redactions. Let us start with the column SSN.

  1. For the SSN column, on the Sensitive menu, choose Redact data.
  2. Under Apply redaction, select Full string value.
  3. We redact all the non-alphanumeric characters and replace them with #.
  4. Choose Preview changes to compare the redacted values.
  5. Choose Apply.

On the Sensitive menu, all the data masking transformations—redact, replace, and hash data—are irreversible. After we finalize our recipe and run the DataBrew job, the job output to Amazon S3 is permanently redacted and we can’t recover it.

  1. Now, let’s apply redaction to multiple columns, assuming the following columns must not be consumed by any downstream users like data analyst, BI engineer, and data scientist:
    1. DRIVERS
    2. PASSPORT
    3. BIRTHPLACE
    4. ADDRESS
    5. LAT
    6. LON

In special cases, when we need to recover our sensitive data, instead of masking, we can encrypt our column values and when needed, decrypt the data to bring it back to its original format. Let’s assume we require a column value to be decrypted by a downstream application; in that case, we can encrypt our sensitive data.

We have two encryption options: deterministic and probabilistic. For use cases when we want to join two datasets on the same encrypted column, we should apply deterministic encryption. It makes sure that the encrypted value of all the distinct values is the same across DataBrew projects as long as we use the same AWS secret key. Additionally, keep in mind that when you apply deterministic encryption on your PII columns, you can only use DataBrew to decrypt those columns.

For our use case, let’s assume we want to perform deterministic encryption on a few of our columns.

  1. On the Sensitive menu, choose Deterministic encryption.
  2. For Source columns, select BIRTHDATE, DEATHDATE, FIRST, and LAST.
  3. For Encryption option, select Deterministic encryption.
  4. For Select secret, choose the databrew!default AWS secret.
  5. Choose Apply.
  6. After you finish applying all your transformations, choose Publish.
  7. Enter a description for the recipe version and choose Publish.

Create a DataBrew job

Now that our recipe is ready, we can create a job to apply the recipe steps to the Patients dataset.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create a job.
  3. For Job name, enter a name (for example, Patient PII Making and Encryption).
  4. Select the Patients dataset and choose patients-pii-handling-recipe as your recipe.
  5. Under Job output settings¸ for File type, choose your final storage format to be Parquet.
  6. For S3 location, enter your S3 output as s3://databrew-clean-pii-data-<Account-ID>/cleaned_data_output/.
  7. For Compression, choose None.
  8. For File output storage, select Replace output files for each job run.
  9. Under Permissions, for Role name¸ choose the same IAM role we used previously.
  10. Choose Create and run job.

Create an Athena table

You can create tables by writing the DDL statement in the Athena query editor. If you’re not familiar with Apache Hive, you should review Creating Tables in Athena to learn how to create an Athena table that references the data residing in Amazon S3.

To create an Athena table, use the query editor and enter the following DDL statement:

CREATE EXTERNAL TABLE patient_masked_encrypted_data (
   `id` string, 
  `birthdate` string, 
  `deathdate` string, 
  `ssn` string, 
  `drivers` string, 
  `passport` string, 
  `prefix` string, 
  `first` string, 
  `last` string, 
  `suffix` string, 
  `maiden` string, 
  `marital` string, 
  `race` string, 
  `ethnicity` string, 
  `gender` string, 
  `birthplace` string, 
  `address` string, 
  `city` string, 
  `state` string, 
  `county` string, 
  `zip` int, 
  `lat` string, 
  `lon` string, 
  `healthcare_expenses` double, 
  `healthcare_coverage` double 
)
STORED AS PARQUET
LOCATION 's3://databrew-clean-pii-data-<Account-ID>/cleaned_data_output/'

Let’s validate the table output in Athena by running a simple SELECT query. The following screenshot shows the output.

We can clearly see the encrypted and redacted column values in our query output.

Cleaning up

To avoid incurring future charges, delete the resources created during this walkthrough.

Conclusion

As demonstrated in this post, you can use DataBrew to help identify, redact, and encrypt PII data. With these new PII transformations, you can streamline and simplify customer data management across industries such as financial services, government, retail, and much more.

Now that you can protect your sensitive data workloads to meet regulatory and compliance best practices, you can use this solution to build de-identified data lakes in AWS. Sensitive data fields remain protected throughout their lifecycle, whereas non-sensitive data fields remain in the clear. This approach can allow analytics or other business functions to operate on data without exposing sensitive data.


About the Authors

Harsh Vardhan Singh Gaur is an AWS Solutions Architect, specializing in Analytics. He has over 5 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Navnit Shukla is an AWS Specialist Solution Architect, Analytics, and is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Improve Amazon Athena query performance using AWS Glue Data Catalog partition indexes

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/improve-amazon-athena-query-performance-using-aws-glue-data-catalog-partition-indexes/

The AWS Glue Data Catalog provides partition indexes to accelerate queries on highly partitioned tables. In the post Improve query performance using AWS Glue partition indexes, we demonstrated how partition indexes reduce the time it takes to fetch partition information during the planning phase of queries run on Amazon EMR, Amazon Redshift Spectrum, and AWS Glue extract, transform, and load (ETL) jobs.

We’re pleased to announce Amazon Athena support for AWS Glue Data Catalog partition indexes. You can use the same indexes configured for Amazon EMR, Redshift Spectrum, and AWS Glue ETL jobs with Athena to reduce query planning times for highly partitioned tables, which is common in most data lakes on Amazon Simple Storage Service (Amazon S3).

In this post, we describe how to set up partition indexes and perform a few sample queries to demonstrate the performance improvement on Athena queries.

Set up resources with AWS CloudFormation

To help you get started quickly, we provide an AWS CloudFormation template, the same template we used in a previous post. You can review and customize it to suit your needs. Some of the resources this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

If you’re using AWS Lake Formation permissions, you need to make sure that the IAM user or role running AWS CloudFormation has the required permissions to create a database on the AWS Glue Data Catalog.

The tables created by the CloudFormation template use sample data located in an S3 public bucket. The data is partitioned by the columns year, month, day, and hour. There are 367,920 partition folders in total, and each folder has a single file in JSON format that contains an event similar to the following:

{
  "id": "95c4c9a7-4718-4031-9e79-b56b72220fbc",
  "value": 464.22130592811703
}

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatabaseName, leave as the default.
  5. Choose Next.
  6. On the next page, choose Next.
  7. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create.

Stack creation can take up to 5 minutes. When the stack is complete, you have two Data Catalog tables: table_with_index and table_without_index. Both tables point to the same S3 bucket, as mentioned previously, which holds data for more than 42 years (1980–2021) in 367,920 partitions. Each partition folder includes a data.json file containing the event data. In the following sections, we demonstrate how the partition indexes improve query performance with these tables using an example that represents large datasets in a data lake.

Set up partition indexes

You can create up to three partition indexes per table for new and existing tables. If you want to create a new table with partition indexes, you can include a list of PartitionIndex objects with the CreateTable API call. To add a partition index to an existing table, use the CreatePartitionIndex API call. You can also perform these actions from the AWS Glue console.

Let’s configure a new partition index for the table table_with_index we created with the CloudFormation template.

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Partitions and indices.
  4. Choose Add new index.
  5. For Index name, enter year-month-day-hour.
  6. For Selected keys from schema, select year, month, day, and hour. Make that you choose each column in this order, and confirm that Partition key for each column is correctly configured as follows:
    1. year: Partition (0)
    2. month: Partition (1)
    3. day: Partition (2)
    4. hour: Partition (3)
  7. Choose Add index.

The Status column of the newly created partition index shows as Creating. We need to wait for the partition index to be Active before it can be used by query engines. It should take about 1 hour to process and build the index for 367,920 partitions.

When the partition index is ready for table_with_index, you can use it when querying with Athena. For table_without_index, you should expect to see no change in query latency because no partition indexes were configured.

Enable partition filtering

To enable partition filtering in Athena, you need to update the table properties as follows:

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Edit table.
  4. Under Table properties, add the following:
    1. Keypartition_filtering.enabled
    2. Valuetrue
  5. Choose Apply.

Alternatively, you can set this parameter by running an ALTER TABLE SET PROPERTIES query in Athena:

ALTER TABLE partition_index.table_with_index
SET TBLPROPERTIES ('partition_filtering.enabled' = 'true')

Query tables using Athena

Now that your table has filtering enabled for Athena, let’s query both tables to see the performance differences.

First, query the table without using the partition index. In the Athena query editor, enter the following query:

SELECT count(*), sum(value) 
FROM partition_index.table_without_index 
WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows the query took 44.9 seconds.

Next, query the table with using the partition index. You need to use the columns that are configured for the indexes in the WHERE clause to gain these performance benefits. Run the following query:

SELECT count(*), sum(value) 
FROM partition_index.table_with_index 
WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows the query took just 1.3 seconds to complete, which is significantly faster than the table without indexes.

Query planning is the phase where the table and partition metadata are fetched from the AWS Glue Data Catalog. With partition indexes enabled, retrieving only the partitions required by the query can be done more efficiently and therefore quicker. Let’s retrieve the execution details of each query by using the AWS Command Line Interface (AWS CLI) to compare planning statistics.

The following is the query execution details for the query that ran against a table without partition indexes:

$ aws athena get-query-execution --query-execution-id 5e972df6-11f8-467a-9eea-77f509a23573 --query QueryExecution.Statistics --output table
--------------------------------------------
|             GetQueryExecution            |
+---------------------------------+--------+
|  DataScannedInBytes             |  1782  |
|  EngineExecutionTimeInMillis    |  44914 |
|  QueryPlanningTimeInMillis      |  44451 |
|  QueryQueueTimeInMillis         |  278   |
|  ServiceProcessingTimeInMillis  |  47    |
|  TotalExecutionTimeInMillis     |  45239 |
+---------------------------------+--------+

The following is the query execution details for a query that ran against a table with partition indexes:

% aws athena get-query-execution --query-execution-id 31d0b4ae-ae8d-4836-b20b-317fa9d9b79a --query QueryExecution.Statistics --output table
-------------------------------------------
|            GetQueryExecution            |
+---------------------------------+-------+
|  DataScannedInBytes             |  1782 |
|  EngineExecutionTimeInMillis    |  1361 |
|  QueryPlanningTimeInMillis      |  384  |
|  QueryQueueTimeInMillis         |  190  |
|  ServiceProcessingTimeInMillis  |  58   |
|  TotalExecutionTimeInMillis     |  1609 |
+---------------------------------+-------+

QueryPlanningTimeInMillis represents the number of milliseconds that Athena took to plan the query processing flow. This includes the time spent retrieving table partitions from the data source. Because the query engine performs the query planning, the query planning time is a subset of engine processing time.

Comparing the stats for both queries, we can see that QueryPlanningTimeInMillis is significantly lower in the query using partition indexes. It went from 44 seconds to 0.3 seconds when using partition indexes. The improvement in query planning resulted in a faster overall query runtime, going from 45 seconds to 1.3 seconds—a 35 times greater performance improvement.

Clean up

Now to the final step, cleaning up the resources:

  1. Delete the CloudFormation stack.
  2. Confirm both tables have been deleted from the AWS Glue Data Catalog.

Conclusion

At AWS, we strive to improve the performance of our services and our customers’ experience. The AWS Glue Data Catalog is a fully managed, Apache Hive compatible metastore that enables a wide range of big data, analytics, and machine learning services, like Athena, Amazon EMR, Redshift Spectrum, and AWS Glue ETL, to access data in the data lake. Athena customers can now further reduce query latency by enabling partition indexes for your tables in Amazon S3. Using partition indexes can improve the efficiency of retrieving metadata for highly partitioned tables ranging in the tens and hundreds of thousands and millions of partitions.

You can learn more about AWS Glue Data Catalog partition indexes in Working with Partition Indexes, and more about Athena best practices in Best Practices When Using Athena with AWS Glue.


About the Author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about architecting fast-growing data platforms, diving deep into distributed big data software like Apache Spark, building reusable software artifacts for data lakes, and sharing the knowledge in AWS Big Data blog posts. In his spare time, he enjoys having and watching killifish, hermit crabs, and grubs with his children.

Catalog and analyze Application Load Balancer logs more efficiently with AWS Glue custom classifiers and Amazon Athena

Post Syndicated from Ray Wang original https://aws.amazon.com/blogs/big-data/catalog-and-analyze-application-load-balancer-logs-more-efficiently-with-aws-glue-custom-classifiers-and-amazon-athena/

You can query Application Load Balancer (ALB) access logs for various purposes, such as analyzing traffic distribution and patterns. You can also easily use Amazon Athena to create a table and query against the ALB access logs on Amazon Simple Storage Service (Amazon S3). (For more information, see How do I analyze my Application Load Balancer access logs using Amazon Athena? and Querying Application Load Balancer Logs.) All queries are run against the whole table because it doesn’t define any partitions. If you have several years of ALB logs, you may want to use a partitioned table instead for better query performance and cost control. In fact, partitioning data is one of the Top 10 performance tuning tips for Athena.

However, because ALB log files aren’t stored in a Hive-style prefix (such as /year=2021/), the process of creating thousands of partitions using ALTER TABLE ADD PARTITION in Athena is cumbersome. This post shows a way to create and schedule an AWS Glue crawler with a Grok custom classifier that infers the schema of all ALB log files under the specified Amazon S3 prefix and populates the partition metadata (year, month, and day) automatically to the AWS Glue Data Catalog.

Prerequisites

To follow along with this post, complete the following prerequisites:

  1. Enable access logging of the ALBs, and have the files already ingested in the specified S3 bucket.
  2. Set up the Athena query result location. For more information, see Working with Query Results, Output Files, and Query History.

Solution overview

The following diagram illustrates the solution architecture.

To implement this solution, we complete the following steps:

  1. Prepare the Grok pattern for our ALB logs, and cross-check with a Grok debugger.
  2. Create an AWS Glue crawler with a Grok custom classifier.
  3. Run the crawler to prepare a table with partitions in the Data Catalog.
  4. Analyze the partitioned data using Athena and compare query speed vs. a non-partitioned table.

Prepare the Grok pattern for our ALB logs

As a preliminary step, locate the access log files on the Amazon S3 console, and manually inspect the files to observe the format and syntax. To allow an AWS Glue crawler to recognize the pattern, we need to use a Grok pattern to match against an expression and map specific parts into the corresponding fields. Approximately 100 sample Grok patterns are available in the Logstash Plugins GitHub, and we can write our own custom pattern if it’s not listed.

The following the basic syntax format for a Grok pattern %{PATTERN:FieldName}

The following is an example of an ALB access log:

http 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 10.0.0.1:80 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337262-36d228ad5d99923122bbe354" "-" "-" 0 2018-07-02T22:22:48.364000Z "forward" "-" "-" "10.0.0.1:80" "200" "-" "-"
https 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 10.0.0.1:80 0.086 0.048 0.037 200 200 0 57 "GET https://www.example.com:443/ HTTP/1.1" "curl/7.46.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337281-1d84f3d73c47ec4e58577259" "www.example.com" "arn:aws:acm:us-east-2:123456789012:certificate/12345678-1234-1234-1234-123456789012" 1 2018-07-02T22:22:48.364000Z "authenticate,forward" "-" "-" "10.0.0.1:80" "200" "-" "-"
h2 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 10.0.1.252:48160 10.0.0.66:9000 0.000 0.002 0.000 200 200 5 257 "GET https://10.0.2.105:773/ HTTP/2.0" "curl/7.46.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337327-72bd00b0343d75b906739c42" "-" "-" 1 2018-07-02T22:22:48.364000Z "redirect" "https://example.com:80/" "-" "10.0.0.66:9000" "200" "-" "-"
ws 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 10.0.0.140:40914 10.0.1.192:8010 0.001 0.003 0.000 101 101 218 587 "GET http://10.0.0.30:80/ HTTP/1.1" "-" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 1 2018-07-02T22:22:48.364000Z "forward" "-" "-" "10.0.1.192:8010" "101" "-" "-"
wss 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 10.0.0.140:44244 10.0.0.171:8010 0.000 0.001 0.000 101 101 218 786 "GET https://10.0.0.30:443/ HTTP/1.1" "-" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-west-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 1 2018-07-02T22:22:48.364000Z "forward" "-" "-" "10.0.0.171:8010" "101" "-" "-"
http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 - 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 0 2018-11-30T22:22:48.364000Z "forward" "-" "-" "-" "-" "-" "-"
http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 - 0.000 0.001 0.000 502 - 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 0 2018-11-30T22:22:48.364000Z "forward" "-" "LambdaInvalidResponse" "-" "-" "-" "-"

To map the first field, the Grok pattern might look like the following code:

%{DATA:type}\s

The pattern includes the following components:

  • DATA maps to .*?
  • type is the column name
  • \s is the whitespace character

To map the second field, the Grok pattern might look like the following:

%{TIMESTAMP_ISO8601:time}\s

This pattern has the following elements:

  • TIMESTAMP_ISO8601 maps to %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
  • time is the column name
  • \s is the whitespace character

When writing Grok patterns, we should also consider corner cases. For example, the following code is a normal case:

%{BASE10NUM:target_processing_time}\s

But when considering the possibility of null value, we should replace the pattern with the following:

%{DATA:target_processing_time}\s

When our Grok pattern is ready, we can test the Grok pattern with sample input using a third-party Grok debugger. The following pattern is a good start, but always remember to test it with the actual ALB logs.

%{DATA:type}\s+%{TIMESTAMP_ISO8601:time}\s+%{DATA:elb}\s+%{DATA:client}\s+%{DATA:target}\s+%{BASE10NUM:request_processing_time}\s+%{DATA:target_processing_time}\s+%{BASE10NUM:response_processing_time}\s+%{BASE10NUM:elb_status_code}\s+%{DATA:target_status_code}\s+%{BASE10NUM:received_bytes}\s+%{BASE10NUM:sent_bytes}\s+\"%{DATA:request}\"\s+\"%{DATA:user_agent}\"\s+%{DATA:ssl_cipher}\s+%{DATA:ssl_protocol}\s+%{DATA:target_group_arn}\s+\"%{DATA:trace_id}\"\s+\"%{DATA:domain_name}\"\s+\"%{DATA:chosen_cert_arn}\"\s+%{DATA:matched_rule_priority}\s+%{TIMESTAMP_ISO8601:request_creation_time}\s+\"%{DATA:actions_executed}\"\s+\"%{DATA:redirect_url}\"\s+\"%{DATA:error_reason}\"\s+\"%{DATA:target_list}\"\s+\"%{DATA:target_status_code_list}\"\s+\"%{DATA:classification}\"\s+\"%{DATA:classification_reason}\"

Keep in mind that when you copy the Grok pattern from your browser, in some cases there are extra spaces in the end of the lines. Make sure to remove these extra spaces.

Create an AWS Glue crawler with a Grok custom classifier

Before you create your crawler, you first create a custom classifier. Complete the following steps:

  1. On the AWS Glue console, under Crawler, choose Classifiers.
  2. Choose Add classifier.
  3. For Classifier name, enter alb-logs-classifier.
  4. For Classifier type¸ select Grok.
  5. For Classification, enter alb-logs.
  6. For Grok pattern, enter the pattern from the previous section.
  7. Choose Create.

Now you can create your crawler.

  1. Choose Crawlers in the navigation pane.
  2. Choose Add crawler.
  3. For Crawler name, enter alb-access-log-crawler.
  4. For Selected classifiers, enter alb-logs-classifier.

  5. Choose Next.
  6. For Crawler source type, select Data stores.
  7. For Repeat crawls of S3 data stores, select Crawl new folders only.
  8. Choose Next.
  9. For Choose a data store, choose S3.
  10. For Crawl data in, select Specified path in my account.
  11. For Include path, enter the path to your ALB logs (for example, s3://alb-logs-directory/AWSLogs/<ACCOUNT-ID>/elasticloadbalancing/<REGION>/).
  12. Choose Next.
  13. When prompted to add another data store, select No and choose Next.
  14. Select Create an IAM role, and give it a name such as AWSGlueServiceRole-alb-logs-crawler.
  15. For Frequency, choose Daily.
  16. Indicate your start hour and minute.
  17. Choose Next.
  18. For Database, enter elb-access-log-db.
  19. For Prefix added to tables, enter alb_logs_.
  20. Expand Configuration options.
  21. Select Update all new and existing partitions with metadata from the table.
  22. Keep the other options at their default.
  23. Choose Next.
  24. Review your settings and choose Finish.

Run your AWS Glue crawler

Next, we run our crawler to prepare a table with partitions in the Data Catalog.

  1. On the AWS Glue console, choose Crawlers.
  2. Select the crawler we just created.
  3. Choose Run crawler.

When the crawler is complete, you receive a notification indicating that a table has been created.

Next, we review and edit the schema.

  1. Under Databases, choose Tables.
  2. Choose the table alb_logs_<region>.
  3. Cross-check the column name and corresponding data type.

The table has three columns: partiion_0, partition_1, and partition_2.

  1. Choose Edit schema.
  2. Rename the columns year, month, and day.
  3. Choose Save.

Analyze the data using Athena

Next, we analyze our data by querying the access logs. We compare the query speed between the following tables:

  • Non-partitioned table – All data is treated as a single table
  • Partitioned table – Data is partitioned by year, month, and day

Query the non-partitioned table

With the non-partitioned table, if we want to query access logs on a specific date, we have to write the WHERE clause using the LIKE operator because the data column was interpreted as a string. See the following code:

SELECT COUNT(1) FROM "elb-access-log-db"."alb_logs" WHERE type='h2' AND time LIKE '2020-12-29%';

The query takes 5.25 seconds to complete, with 3.15 MB data scanned.

Query the partitioned table

With the year, month, and day columns as partitions, we can use the following statement to query access logs on the same day:

SELECT COUNT(1) FROM "elb-access-log-db"."alb_logs" WHERE type='h2' AND year=2020 AND month=12 AND day=29;

This time the query takes only 1.89 seconds to complete, with 25.72 KB data scanned.

This query is faster and costs less (because less data is scanned) due to partition pruning.

Clean up

To avoid incurring future charges, delete the resources created in the Data Catalog, and delete the AWS Glue crawler.

Summary

In this post, we illustrated how to create an AWS Glue crawler that populates ALB logs metadata in the AWS Glue Data Catalog automatically with partitions by year, month, and day. With partition pruning, we can improve query performance and associated costs in Athena.

If you have questions or suggestions, please leave a comment.


About the Authors

Ray Wang is a Solutions Architect at AWS. With 8 years of experience in the IT industry, Ray is dedicated to building modern solutions on the cloud, especially in big data and machine learning. As a hungry go-getter, he passed all 12 AWS certificates to make his technical field not only deep but wide. He loves to read and watch sci-fi movies in his spare time.

Corvus Lee is a Data Lab Solutions Architect at AWS. He enjoys all kinds of data-related discussions with customers, from high-level like white boarding a data lake architecture, to the details of data modeling, writing Python/Spark code for data processing, and more.

How GE Aviation built cloud-native data pipelines at enterprise scale using the AWS platform

Post Syndicated from Alcuin Weidus original https://aws.amazon.com/blogs/big-data/how-ge-aviation-built-cloud-native-data-pipelines-at-enterprise-scale-using-the-aws-platform/

This post was co-written with Alcuin Weidus, Principal Architect from GE Aviation.

GE Aviation, an operating unit of GE, is a world-leading provider of jet and turboprop engines, as well as integrated systems for commercial, military, business, and general aviation aircraft. GE Aviation has a global service network to support these offerings.

From the turbosupercharger to the world’s most powerful commercial jet engine, GE’s history of powering the world’s aircraft features more than 90 years of innovation.

In this post, we share how GE Aviation built cloud-native data pipelines at enterprise scale using the AWS platform.

A focus on the foundation

At GE Aviation, we’ve been invested in the data space for many years. Witnessing the customer value and business insights that could be extracted from data at scale has propelled us forward. We’re always looking for new ways to evolve, grow, and modernize our data and analytics stack. In 2019, this meant moving from a traditional on-premises data footprint (with some specialized AWS use cases) to a fully AWS Cloud-native design. We understood the task was challenging, but we were committed to its success. We saw the tremendous potential in AWS, and were eager to partner closely with a company that has over a decade of cloud experience.

Our goal from the outset was clear: build an enterprise-scale data platform to accelerate and connect the business. Using the best of cloud technology would set us up to deliver on our goal and prioritize performance and reliability in the process. From an early point in the build, we knew that if we wanted to achieve true scale, we had to start with solid foundations. This meant first focusing on our data pipelines and storage layer, which serve as the ingest point for hundreds of source systems. Our team chose Amazon Simple Storage Service (Amazon S3) as our foundational data lake storage platform.

Amazon S3 was the first choice as it provides an optimal foundation for a data lake store delivering virtually unlimited scalability and 11 9s of durability. In addition to its scalable performance, it has ease-of-use features, native encryption, and access control capabilities. Equally important, Amazon S3 integrates with a broad portfolio of AWS services, such as Amazon Athena, the AWS Glue Data Catalog, AWS Glue ETL (extract, transform, and load) Amazon Redshift, Amazon Redshift Spectrum, and many third-party tools, providing a growing ecosystem of data management tools.

How we started

The journey started with an internal hackathon that brought cross-functional team members together. We organized around an initial design and established an architecture to start the build using serverless patterns. A combination of Amazon S3, AWS Glue ETL, and the Data Catalog were central to our solution. These three services in particular aligned to our broader strategy to be serverless wherever possible and build on top of AWS services that were experiencing heavy innovation in the way of new features.

We felt good about our approach and promptly got to work.

Solution overview

Our cloud data platform built on Amazon S3 is fed from a combination of enterprise ELT systems. We have an on-premises system that handles change data capture (CDC) workloads and another that works more in a traditional batch manner.

Our design has the on-premises ELT systems dropping files into an S3 bucket set up to receive raw data for both situations. We made the decision to standardize our processed data layer into Apache Parquet format for our cataloged S3 data lake in preparation for more efficient serverless consumption.

Our enterprise CDC system can already land files natively in Parquet; however, our batch files are limited to CSV, so the landing of CSV files triggers another serverless process to convert these files to Parquet using AWS Glue ETL.

The following diagram illustrates this workflow.

When raw data is present and ready in Apache Parquet format, we have an event-triggered solution that processes the data and loads it to another mirror S3 bucket (this is where our users access and consume the data).

Pipelines are developed to support loading at a table level. We have specific AWS Lambda functions to identify schema errors by comparing each file’s schema against the last successful run. Another function validates that a necessary primary key file is present for any CDC tables.

Data partitioning and CDC updates

When our preprocessing Lambda functions are complete, the files are processed in one of two distinct paths based on the table type. Batch table loads are by far the simpler of the two and are handled via a single Lambda function.

For CDC tables, we use AWS Glue ETL to load and perform the updates against our tables stored in the mirror S3 bucket. The AWS Glue job uses Apache Spark data frames to combine historical data, filter out deleted records, and union with any records inserted. For our process, updates are treated as delete-then-insert. After performing the union, the entire dataset is written out to the mirror S3 bucket in a newly created bucket partition.

The following diagram illustrates this workflow.

We write data into a new partition for each table load, so we can provide read consistency in a way that makes sense to our consuming business partners.

Building the Data Catalog

When each Amazon S3 mirror data load is complete, another separate serverless branch is triggered to handle catalog management.

The branch updates the location property within the catalog for pre-existing tables, indicating each newly added partition. When loading a table for the first time, we trigger a series of purpose-built Lambda functions to create the AWS Glue Data Catalog database (only required when it’s an entirely new source schema), create an AWS Glue crawler, start the crawler, and delete the crawler when it’s complete.

The following diagram illustrates this workflow.

These event-driven design patterns allow us to fully automate the catalog management piece of our architecture, which became a big win for our team because it lowered the operational overhead associated with onboarding new source tables. Every achievement like this mattered because it realized the potential the cloud had to transform how we build and support products across our technology organization.

Final implementation architecture and best practices

The solution evolved several times throughout the development cycle, typically due to learning something new in terms of serverless and cloud-native development, and further working with AWS Solutions Architect and AWS Professional Services teams. Along the way, we’ve discovered many cloud-native best practices and accelerated our serverless data journey to AWS.

The following diagram illustrates our final architecture.

We strategically added Amazon Simple Queue Service (Amazon SQS) between purpose-built Lambda functions to decouple the architecture. Amazon SQS gave our system a level of resiliency and operational observability that otherwise would have been a challenge.

Another best practice arose from using Amazon DynamoDB as a state table to help ensure our entire serverless integration pattern was writing to our mirror bucket with ACID guarantees.

On the topic of operational observability, we use Amazon EventBridge to capture and report on operational metadata like table load status, time of the last load, and row counts.

Bringing it all together

At the time of writing, we’ve had production workloads running through our solution for the better part of 14 months.

Production data is integrated from more than 30 source systems at present and totals several hundred tables. This solution has given us a great starting point for building our cloud data ecosystem. The flexibility and extensibility of AWS’s many services have been key to our success.

Appreciation for the AWS Glue Data Catalog has been an essential element. Without knowing it at the time we started building a data lake, we’ve been embracing a modern data architecture pattern and organizing around our transactionally consistent and cataloged mirror storage layer.

The introduction of a more seamless Apache Hudi experience within AWS has been a big win for our team. We’ve been busy incorporating Hudi into our CDC transaction pipeline and are thrilled with the results. We’re able to spend less time writing code managing the storage of our data, and more time focusing on the reliability of our system. This has been critical in our ability to scale. Our development pipeline has grown beyond 10,000 tables and more than 150 source systems as we approach another major production cutover.

Looking ahead, we’re intrigued by the potential for AWS Lake Formation goverened tables to further accelerate our momentum and management of CDC table loads.

Conclusion

Building our cloud-native integration pipeline has been a journey. What started as an idea and has turned into much more in a brief time. It’s hard to appreciate how far we’ve come when there’s always more to be done. That being said, the entire process has been extraordinary. We built deep and trusted partnerships with AWS, learned more about our internal value statement, and aligned more of our organization to a cloud-centric way of operating.

The ability to build solutions in a serverless manner opens up many doors for our data function and, most importantly, our customers. Speed to delivery and the pace of innovation is directly related to our ability to focus our engineering teams on business-specific problems while trusting a partner like AWS to do the heavy lifting of data center operations like racking, stacking, and powering servers. It also removes the operational burden of managing operating systems and applications with managed services. Finally, it allows us to focus on our customers and business process enablement rather than on IT infrastructure.

The breadth and depth of data and analytics services on AWS make it possible to solve our business problems by using the right resources to run whatever analysis is most appropriate for a specific need. AWS Data and Analytics has deep integrations across all layers of the AWS ecosystem, giving us the tools to analyze data using any approach quickly. We appreciate AWS’s continual innovation on behalf of its customers.


About the Authors

Alcuin Weidus is a Principal Data Architect for GE Aviation. Serverless advocate, perpetual data management student, and cloud native strategist, Alcuin is a data technology leader on a team responsible for accelerating technical outcomes across GE Aviation. Connect him on Linkedin.

Suresh Patnam is a Sr Solutions Architect at AWS; He works with customers to build IT strategy, making digital transformation through the cloud more accessible, focusing on big data, data lakes, and AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family. Connect him on LinkedIn.

Orchestrate an ETL pipeline using AWS Glue workflows, triggers, and crawlers with custom classifiers

Post Syndicated from Mohit Mehta original https://aws.amazon.com/blogs/big-data/orchestrate-an-etl-pipeline-using-aws-glue-workflows-triggers-and-crawlers-with-custom-classifiers/

Extract, transform, and load (ETL) orchestration is a common mechanism for building big data pipelines. Orchestration for parallel ETL processing requires the use of multiple tools to perform a variety of operations. To simplify the orchestration, you can use AWS Glue workflows. This post demonstrates how to accomplish parallel ETL orchestration using AWS Glue workflows and triggers. We also demonstrate how to use custom classifiers with AWS Glue crawlers to classify fixed width data files.

AWS Glue workflows provide a visual and programmatic tool to author data pipelines by combining AWS Glue crawlers for schema discovery and AWS Glue Spark and Python shell jobs to transform the data. A workflow consists of one of more task nodes arranged as a graph. Relationships can be defined and parameters passed between task nodes to enable you to build pipelines of varying complexity. You can trigger workflows on a schedule or on-demand. You can track the progress of each node independently or the entire workflow, making it easier to troubleshoot your pipelines.

You need to define a custom classifier if you want to automatically create a table definition for data that doesn’t match AWS Glue built-in classifiers. For example, if your data originates from a mainframe system that utilizes a COBOL copybook data structure, you need to define a custom classifier when crawling the data to extract the schema. AWS Glue crawlers enable you to provide a custom classifier to classify your data. You can create a custom classifier using a Grok pattern, an XML tag, JSON, or CSV. When the crawler starts, it calls a custom classifier. If the classifier recognizes the data, it stores the classification and schema of the data in the AWS Glue Data Catalog.

Use case

For this post, we use automated clearing house (ACH) and check payments data ingestion as an example. ACH is a computer-based electronic network for processing transactions, and check payments is a negotiable transaction drawn against deposited funds, to pay the recipient a specific amount of funds on demand. Both ACH and check payments data files, which are in fixed width format, need to be ingested in the data lake incrementally over a time series. As part of the ingestion, these two data types need to be merged to get a consolidated view of all payments. ACH and check payment records are consolidated into a table that is useful for performing business analytics using Amazon Athena.

Solution overview

We define an AWS Glue crawler with a custom classifier for each file or data type. We use an AWS Glue workflow to orchestrate the process. The workflow triggers crawlers to run in parallel. When the crawlers are complete, the workflow starts an AWS Glue ETL job to process the input data files. The workflow tracks the completion of the ETL job that performs the data transformation and updates the table metadata in AWS Glue Data Catalog.

The following diagram illustrates a typical workflow for ETL workloads.

This post is accompanied by an AWS CloudFormation template that creates resources described by the AWS Glue workflow architecture. AWS CloudFormation enables you to model, provision, and manage AWS resources by treating infrastructure as code.

The CloudFormation template creates the following resources:

  • An AWS Glue workflow trigger that is started manually. The trigger starts two crawlers simultaneously for processing the data file related to ACH payments and check payments, respectively.
  • Custom classifiers for parsing incoming fixed width files containing ACH and check data.
  • AWS Glue crawlers:
    • A crawler to classify ACH payments in the RAW database. This crawler uses the custom classifier defined for ACH payments raw data. The crawler creates a table named ACH in the Data Catalog’s RAW database.
    • A crawler to classify check payments. This crawler uses the custom classifier defined for check payments raw data. This crawler creates a table named Check in the Data Catalog’s RAW database.
  • An AWS Glue ETL job that runs when both crawlers are complete. The ETL job reads the ACH and check tables, performs transformations using PySpark DataFrames, writes the output to a target Amazon Simple Storage Service (Amazon S3) location, and updates the Data Catalog for the processedpayment table with new hourly partition.
  • S3 buckets designated as RawDataBucket, ProcessedBucket, and ETLBucket. RawDataBucket holds the raw payment data as it is received from the source system, and ProcessedBucket holds the output after AWS Glue transformations have been applied. This data is suitable for consumption by end-users via Athena. ETLBucket contains the AWS Glue ETL code that is used for processing the data as part of the workflow.

Create resources with AWS CloudFormation

To create your resources with the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. Choose Next again.
  4. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  5. Choose Create stack.

Examine custom classifiers for fixed width files

Let’s review the definition of the custom classifier.

  1. On the AWS Glue console, choose Crawlers.
  2. Choose the crawler ach-crawler.
  3. Choose the RawACHClassifier classifier and review the Grok pattern.

This pattern assumes that the first 16 characters in the fixed width file are reserved for acct_num, and the next 10 characters are reserved for orig_pmt_date. When a crawler finds a classifier that matches the data, the classification string and schema are used in the definition of tables that are written to your Data Catalog.

Run the workflow

To run your workflow, complete the following steps:

  1. On the AWS Glue console, select the workflow that the CloudFormation template created.
  2. On the Actions menu, select Run.

This starts the workflow.

  1. When the workflow is complete, on the History tab, choose View run details.

You can review a graph depicting the workflow.

Examine the tables

In the Databases section under AWS Glue console, you can find a database named glue-database-raw, which contains two tables named ach and check. These tables are created by the respective AWS Glue crawler using the custom classification pattern specified.

Query processed data

To query your data, complete the following steps:

  1. On the AWS Glue console, select the database glue-database-processed.
  2. On the Action menu, choose View data.

The Athena console opens. If this is your first time using Athena, you need to set up the S3 bucket to store the query result.

  1. In the query editor, run the following query:
select acct_num,pymt_type,count(pymt_type)
from glue_database_processed.processedpayment 
group by acct_num,pymt_type;

You can see the count of payment type in each account displayed from the processedpayment table.

Clean up

To avoid incurring ongoing charges, clean up your infrastructure by deleting the CloudFormation stack. However, you first need to empty your S3 buckets.

  1. On the Amazon S3 console, select each bucket created by the CloudFormation stack.
  2. Choose Empty.
  3. On the AWS CloudFormation console, select the stack you created.
  4. Choose Delete.

Conclusion

In this post we explored how AWS Glue Workflows enable data engineers to build and orchestrate a data pipeline to discover, classify and process standard and non-standard data files. We also discussed how to leverage AWS Glue Workflow along with AWS Glue Custom Classifier, AWS Glue Crawlers and AWS Glue ETL capabilities to ingest from multiple sources into a data lake. We also walked through how you can use Amazon Athena to perform interactive SQL analysis.

For more details on using AWS Glue Workflows, see Performing Complex ETL Activities Using Blueprints and Workflows in AWS Glue.

For more information on AWS Glue ETL jobs, see Build a serverless event-driven workflow with AWS Glue.

For More information on using Athena, see Getting Started with Amazon Athena.


Appendix: Create a regular expression pattern for a custom classifier

Grok is a tool that you can use to parse textual data given a matching pattern. A Grok pattern is a named set of regular expressions (regex) that are used to match data one line at a time. AWS Glue uses Grok patterns to infer the schema of your data. When a Grok pattern matches your data, AWS Glue uses the pattern to determine the structure of your data and map it into fields. AWS Glue provides many built-in patterns, or you can define your own. When defining you own pattern, it’s a best practice to test the regular expression prior to setting up the AWS Glue classifier.

One way to do that is to build and test your regular expression by using https://regex101.com/#PYTHON. For this, you need to take a small sample from your input data. You can visualize the output of your regular expression by completing the following steps:

  1. Copy the following rows from the source file to the test string section.
    111111111ABCDEX 01012019000A2345678A23456S12345678901012ABCDEFGHMJOHN JOE                           123A5678ABCDEFGHIJK      ISECNAMEA                           2019-01-0100000123123456  VAC12345678901234
    211111111BBCDEX 02012019001B2345678B23456712345678902012BBCDEFGHMJOHN JOHN                          123B5678BBCDEFGHIJK      USECNAMEB                           2019-02-0100000223223456  XAC12345678901234

  2. Construct the regex pattern based on the specifications. For example, the first 16 characters represent acct_num followed by orig_pmt_date of 10 characters. You should end up with a pattern as follows:
(?<acct_num>.{16})(?<orig_pmt_date>.{10})(?<orig_rfc_rtn_num>.{8})(?<trace_seq_num>.{7})(?<cls_pmt_code>.{1})(?<orig_pmt_amt>.{14})(?<aas_code>.{8})(?<line_code>.{1})(?<payee_name>.{35})(?<fi_rtn_num>.{8})(?<dpst_acct_num>.{17})(?<ach_pmt_acct_ind>.{1})(?<scndry_payee_name>.{35})(?<r_orig_pmt_date>.{10})(?<r_orig_rfc_rtn_num>.{8})(?<r_trace_seq_num>.{7})(?<type_pmt_code>.{1})(?<va_stn_code>.{2})(?<va_approp_code>.{1})(?<schedule_num>.{14})

After you validate your pattern, you can create a custom classifier and attach it to an AWS Glue crawler.


About the Authors

Mohit Mehta is a leader in the AWS Professional Services Organization with expertise in AI/ML and big data technologies. Prior to joining AWS, Mohit worked as a digital transformation executive at a Fortune 100 financial services organization. Mohit holds an M.S in Computer Science, all AWS certifications, an MBA from College of William and Mary, and a GMP from Michigan Ross School of Business.

Meenakshi Ponn Shankaran is Senior Big Data Consultant in the AWS Professional Services Organization with expertise in big data. Meenakshi is a SME on working with big data use cases at scale and has experience in architecting and optimizing workloads processing petabyte-scale data lakes. When he is not solving big data problems, he likes to coach the game of cricket.

Create a serverless event-driven workflow to ingest and process Microsoft data with AWS Glue and Amazon EventBridge

Post Syndicated from Venkata Sistla original https://aws.amazon.com/blogs/big-data/create-a-serverless-event-driven-workflow-to-ingest-and-process-microsoft-data-with-aws-glue-and-amazon-eventbridge/

Microsoft SharePoint is a document management system for storing files, organizing documents, and sharing and editing documents in collaboration with others. Your organization may want to ingest SharePoint data into your data lake, combine the SharePoint data with other data that’s available in the data lake, and use it for reporting and analytics purposes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration so that you can start analyzing your data and putting it to use in minutes instead of months.

Organizations often manage their data on SharePoint in the form of files and lists, and you can use this data for easier discovery, better auditing, and compliance. SharePoint as a data source is not a typical relational database and the data is mostly semi structured, which is why it’s often difficult to join the SharePoint data with other relational data sources. This post shows how to ingest and process SharePoint lists and files with AWS Glue and Amazon EventBridge, which enables you to join other data that is available in your data lake. We use SharePoint REST APIs with a standard open data protocol (OData) syntax. OData advocates a standard way of implementing REST APIs that allows for SQL-like querying capabilities. OData helps you focus on your business logic while building RESTful APIs without having to worry about the various approaches to define request and response headers, query options, and so on.

AWS Glue event-driven workflows

Unlike a traditional relational database, SharePoint data may or may not change frequently, and it’s difficult to predict the frequency at which your SharePoint server generates new data, which makes it difficult to plan and schedule data processing pipelines efficiently. Running data processing frequently can be expensive, whereas scheduling pipelines to run infrequently can deliver cold data. Similarly, triggering pipelines from an external process can increase complexity, cost, and job startup time.

AWS Glue supports event-driven workflows, a capability that lets developers start AWS Glue workflows based on events delivered by EventBridge. The main reason to choose EventBridge in this architecture is because it allows you to process events, update the target tables, and make information available to consume in near-real time. Because frequency of data change in SharePoint is unpredictable, using EventBridge to capture events as they arrive enables you to run the data processing pipeline only when new data is available.

To get started, you simply create a new AWS Glue trigger of type EVENT and place it as the first trigger in your workflow. You can optionally specify a batching condition. Without event batching, the AWS Glue workflow is triggered every time an EventBridge rule matches, which may result in multiple concurrent workflows running. AWS Glue protects you by setting default limits that restrict the number of concurrent runs of a workflow. You can increase the required limits by opening a support case. Event batching allows you to configure the number of events to buffer or the maximum elapsed time before firing the particular trigger. When the batching condition is met, a workflow run is started. For example, you can trigger your workflow when 100 files are uploaded in Amazon Simple Storage Service (Amazon S3) or 5 minutes after the first upload. We recommend configuring event batching to avoid too many concurrent workflows, and optimize resource usage and cost.

To illustrate this solution better, consider the following use case for a wine manufacturing and distribution company that operates across multiple countries. They currently host all their transactional system’s data on a data lake in Amazon S3. They also use SharePoint lists to capture feedback and comments on wine quality and composition from their suppliers and other stakeholders. The supply chain team wants to join their transactional data with the wine quality comments in SharePoint data to improve their wine quality and manage their production issues better. They want to capture those comments from the SharePoint server within an hour and publish that data to a wine quality dashboard in Amazon QuickSight. With an event-driven approach to ingest and process their SharePoint data, the supply chain team can consume the data in less than an hour.

Overview of solution

In this post, we walk through a solution to set up an AWS Glue job to ingest SharePoint lists and files into an S3 bucket and an AWS Glue workflow that listens to S3 PutObject data events captured by AWS CloudTrail. This workflow is configured with an event-based trigger to run when an AWS Glue ingest job adds new files into the S3 bucket. The following diagram illustrates the architecture.

To make it simple to deploy, we captured the entire solution in an AWS CloudFormation template that enables you to automatically ingest SharePoint data into Amazon S3. SharePoint uses ClientID and TenantID credentials for authentication and uses Oauth2 for authorization.

The template helps you perform the following steps:

  1. Create an AWS Glue Python shell job to make the REST API call to the SharePoint server and ingest files or lists into Amazon S3.
  2. Create an AWS Glue workflow with a starting trigger of EVENT type.
  3. Configure CloudTrail to log data events, such as PutObject API calls to CloudTrail.
  4. Create a rule in EventBridge to forward the PutObject API events to AWS Glue when they’re emitted by CloudTrail.
  5. Add an AWS Glue event-driven workflow as a target to the EventBridge rule. The workflow gets triggered when the SharePoint ingest AWS Glue job adds new files to the S3 bucket.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Configure SharePoint server authentication details

Before launching the CloudFormation stack, you need to set up your SharePoint server authentication details, namely, TenantID, Tenant, ClientID, ClientSecret, and the SharePoint URL in AWS Systems Manager Parameter Store of the account you’re deploying in. This makes sure that no authentication details are stored in the code and they’re fetched in real time from Parameter Store when the solution is running.

To create your AWS Systems Manager parameters, complete the following steps:

  1. On the Systems Manager console, under Application Management in the navigation pane, choose Parameter Store.
    systems manager
  2. Choose Create Parameter.
  3. For Name, enter the parameter name /DATALAKE/GlueIngest/SharePoint/tenant.
  4. Leave the type as string.
  5. Enter your SharePoint tenant detail into the value field.
  6. Choose Create parameter.
  7. Repeat these steps to create the following parameters:
    1. /DataLake/GlueIngest/SharePoint/tenant
    2. /DataLake/GlueIngest/SharePoint/tenant_id
    3. /DataLake/GlueIngest/SharePoint/client_id/list
    4. /DataLake/GlueIngest/SharePoint/client_secret/list
    5. /DataLake/GlueIngest/SharePoint/client_id/file
    6. /DataLake/GlueIngest/SharePoint/client_secret/file
    7. /DataLake/GlueIngest/SharePoint/url/list
    8. /DataLake/GlueIngest/SharePoint/url/file

Deploy the solution with AWS CloudFormation

For a quick start of this solution, you can deploy the provided CloudFormation stack. This creates all the required resources in your account.

The CloudFormation template generates the following resources:

  • S3 bucket – Stores data, CloudTrail logs, job scripts, and any temporary files generated during the AWS Glue extract, transform, and load (ETL) job run.
  • CloudTrail trail with S3 data events enabled – Enables EventBridge to receive PutObject API call data in a specific bucket.
  • AWS Glue Job – A Python shell job that fetches the data from the SharePoint server.
  • AWS Glue workflow – A data processing pipeline that is comprised of a crawler, jobs, and triggers. This workflow converts uploaded data files into Apache Parquet format.
  • AWS Glue database – The AWS Glue Data Catalog database that holds the tables created in this walkthrough.
  • AWS Glue table – The Data Catalog table representing the Parquet files being converted by the workflow.
  • AWS Lambda function – The AWS Lambda function is used as an AWS CloudFormation custom resource to copy job scripts from an AWS Glue-managed GitHub repository and an AWS Big Data blog S3 bucket to your S3 bucket.
  • IAM roles and policies – We use the following AWS Identity and Access Management (IAM) roles:
    • LambdaExecutionRole – Runs the Lambda function that has permission to upload the job scripts to the S3 bucket.
    • GlueServiceRole – Runs the AWS Glue job that has permission to download the script, read data from the source, and write data to the destination after conversion.
    • EventBridgeGlueExecutionRole – Has permissions to invoke the NotifyEvent API for an AWS Glue workflow.
    • IngestGlueRole – Runs the AWS Glue job that has permission to ingest data into the S3 bucket.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For pS3BucketName, enter the unique name of your new S3 bucket.
  5. Leave pWorkflowName and pDatabaseName as the default.

cloud formation 1

  1. For pDatasetName, enter the SharePoint list name or file name you want to ingest.
  2. Choose Next.

cloud formation 2

  1. On the next page, choose Next.
  2. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create.

It takes a few minutes for the stack creation to complete; you can follow the progress on the Events tab.

You can run the ingest AWS Glue job either on a schedule or on demand. As the job successfully finishes and ingests data into the raw prefix of the S3 bucket, the AWS Glue workflow runs and transforms the ingested raw CSV files into Parquet files and loads them into the transformed prefix.

Review the EventBridge rule

The CloudFormation template created an EventBridge rule to forward S3 PutObject API events to AWS Glue. Let’s review the configuration of the EventBridge rule:

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose the rule s3_file_upload_trigger_rule-<CloudFormation-stack-name>.
  3. Review the information in the Event pattern section.

event bridge

The event pattern shows that this rule is triggered when an S3 object is uploaded to s3://<bucket_name>/data/SharePoint/tablename_raw/. CloudTrail captures the PutObject API calls made and relays them as events to EventBridge.

  1. In the Targets section, you can verify that this EventBridge rule is configured with an AWS Glue workflow as a target.

event bridge target section

Run the ingest AWS Glue job and verify the AWS Glue workflow is triggered successfully

To test the workflow, we run the ingest-glue-job-SharePoint-file job using the following steps:

  1. On the AWS Glue console, select the ingest-glue-job-SharePoint-file job.

glue job

  1. On the Action menu, choose Run job.

glue job action menu

  1. Choose the History tab and wait until the job succeeds.

glue job history tab

You can now see the CSV files in the raw prefix of your S3 bucket.

csv file s3 location

Now the workflow should be triggered.

  1. On the AWS Glue console, validate that your workflow is in the RUNNING state.

glue workflow running status

  1. Choose the workflow to view the run details.
  2. On the History tab of the workflow, choose the current or most recent workflow run.
  3. Choose View run details.

glue workflow visual

When the workflow run status changes to Completed, let’s check the converted files in your S3 bucket.

  1. Switch to the Amazon S3 console, and navigate to your bucket.

You can see the Parquet files under s3://<bucket_name>/data/SharePoint/tablename_transformed/.

parquet file s3 location

Congratulations! Your workflow ran successfully based on S3 events triggered by uploading files to your bucket. You can verify everything works as expected by running a query against the generated table using Amazon Athena.

Sample wine dataset

Let’s analyze a sample red wine dataset. The following screenshot shows a SharePoint list that contains various readings that relate to the characteristics of the wine and an associated wine category. This is populated by various wine tasters from multiple countries.

redwine dataset

The following screenshot shows a supplier dataset from the data lake with wine categories ordered per supplier.

supplier dataset

We process the red wine dataset using this solution and use Athena to query the red wine data and supplier data where wine quality is greater than or equal to 7.

athena query and results

We can visualize the processed dataset using QuickSight.

Clean up

To avoid incurring unnecessary charges, you can use the AWS CloudFormation console to delete the stack that you deployed. This removes all the resources you created when deploying the solution.

Conclusion

Event-driven architectures provide access to near-real-time information and help you make business decisions on fresh data. In this post, we demonstrated how to ingest and process SharePoint data using AWS serverless services like AWS Glue and EventBridge. We saw how to configure a rule in EventBridge to forward events to AWS Glue. You can use this pattern for your analytical use cases, such as joining SharePoint data with other data in your lake to generate insights, or auditing SharePoint data and compliance requirements.


About the Author

Venkata Sistla is a Big Data & Analytics Consultant on the AWS Professional Services team. He specializes in building data processing capabilities and helping customers remove constraints that prevent them from leveraging their data to develop business insights.

Introducing Amazon S3 shuffle in AWS Glue

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/introducing-amazon-s3-shuffle-in-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, which is an open-source, distributed processing system for your data integration tasks and big data workloads. Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple partitions so that you can execute different transformations in parallel.

Shuffling is an important step in a Spark job whenever data is rearranged between partitions. The groupByKey(), reduceByKey(), join(), and distinct() are some examples of wide transformations that can cause a shuffle. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there is not enough disk space left on the executor and there is no recovery.

This post introduces a new Spark shuffle manager available in AWS Glue that disaggregates Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle and spill files. Using Amazon S3 for Spark shuffle storage lets you run data-intensive workloads much more reliably.

Understanding the shuffle operation in AWS Glue

Spark creates physical plans for running your workflow, called Directed Acyclic Graphs (DAGs). The DAG represents a series of transformations on your dataset, each resulting in a new immutable RDD. All of the transformations in Spark are lazy, in that they are not computed until an action is called to generate results. There are two types of transformations:

  • Narrow transformation – Such as map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – Such as join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions.

In Spark, a shuffle occurs whenever data is rearranged between partitions. This is required because the wide transformation needs information from other partitions in order to complete its processing. Spark gathers the required data from each partition and combines it into a new partition. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. With AWS Glue, workers write shuffle data on local disk volumes attached to the AWS Glue workers.

In addition to shuffle writes, Spark uses local disk to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. Shuffle spill (memory) is the size of the de-serialized form of the data in the memory at the time when the worker spills it. Whereas shuffle spill (disk) is the size of the serialized form of the data on disk after the worker has spilled.

Challenges

Spark uses local disk for storing intermediate shuffle and shuffle spills. This introduces the following key challenges:

  • Hitting local storage limits – If you have a Spark job that computes transformations over a large amount of data, and results in either too much spill or shuffle or both, then you might get a failed job with  java.io.IOException: No space left on device exception if the underlying storage has filled up.
  • Co-location of storage with executors – If an executor is lost, then shuffle files are lost as well. This leads to several task and stage retries, as Spark tries to recompute stages in order to recover lost shuffle data. Spark natively provides an external shuffle service that lets it store shuffle data independent to the life of executors. But the shuffle service itself becomes a point of failure and must always be up in order to serve shuffle data. Additionally, shuffles are still stored on local disk, which might run out of space for a large job.

To illustrate one of the preceding scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join, group by, and union all. Let’s run the following query with 10 G1.x AWS Glue DPU (data processing unit). For the G.1X worker type, each worker maps to 1 DPU and 1 executor. 10 G1.x workers account for a total of 640GB of disk space. See the following sql query:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the AWS Glue job run details from the Apache Spark web UI:

The job runs for about 1 hour and 25 minutes, then we start observing task failures. Spark ends up stopping the stage and canceling the job when the task retries also fail.

The following screenshots show the aggregated metrics for the failed stage, as well as how much data is spilled to disk by individual executors:

As seen in the Shuffle Write metric from the above Spark UI screenshot, all 10 workers shuffle over 50 GB of data. Further writes aren’t allowed, and tasks start failing with a “No space left on device” error.

The remaining storage is occupied by data that is spilled to disk, as seen in the Shuffle Spill (Disk) metric from the above Spark UI screenshot. This failed job is a classic example of a data-intensive transformation where Spark is both shuffling and spilling to disk when executor memory is filled.

Solution overview

We have various methods for overcoming the disk space error:

  • Scale out – Increase the number of workers. This incurs an increase in cost. However, scaling out might not always work, especially if your data is heavily skewed on a few keys. Fixing skewness will require considerable modifications to your Spark application logic.
  • Increase shuffle partitions – Increasing the shuffle partitions can sometimes help overcome space errors. However, this might not always work, and therefore is unreliable.
  • Disaggregate compute and storage – This approach presents several of the advantages of not only scaling storage for large shuffles, but also adding reliability in the event of node failures because shuffle data is independently stored. Following are few implementations of this disaggregated approach:
    • Dedicated intermediate storage cluster – In this approach, you use an additional fleet of shuffle services to serve intermediate shuffle. It has several advantages, such as merging shuffle files and sequential I/O, but it introduces an overhead of fleet maintenance from both operations, as well as a cost standpoint. For examples of this approach, see Cosco: An Efficient Facebook-Scale Shuffle Service and Zeus: Uber’s Highly Scalable and Distributed Shuffle as a Service.
    • Serverless storage – AWS Glue implements a different approach in which you utilize Amazon S3, a cost-effective managed and serverless storage, to store intermediate shuffle data. This design does not depend upon a dedicated daemon, such as shuffle service, to preserve shuffle files. This lets you elastically scale your Spark job without the overhead of running, operating, and maintaining additional storage or compute nodes.

With AWS Glue 2.0, you can now use Amazon S3 to store Spark shuffle and spill data. Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably.

The following diagram illustrates how Spark map tasks write the shuffle and spill files to the given Amazon S3 shuffle bucket. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle bucket.

Use Amazon S3 to store shuffle and spill data

The following job parameters enable and tune Spark to use S3 buckets for storing shuffle and spill data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key  Value  Explanation
–write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
–write-shuffle-spills-to-s3 TRUE This is an optional flag that lets you offload spill files to S3 buckets, which provides additional resiliency to your Spark job. This is only required for large workloads that spill a lot of data to disk. This flag is disabled by default.
–conf spark.shuffle.glue.s3ShuffleBucket=S3://<shuffle-bucket> This is also optional, and it specifies the S3 bucket where we write the shuffle files. By default, we use —TempDir/shuffle-data.

You can also use the AWS Glue Studio console to enable Amazon S3 based shuffle or spill. You can choose the preceding properties from pre-populated options in the Job parameters section.

Results

Let’s run the same q80.sql query with Amazon S3 shuffle enabled. We can view the shuffle files stored in the S3 bucket in the following format:

shuffle_<jobid>_<mapperid>_<reducerid>.data/index

Two kinds of files are created:

  • Data – Stores the shuffle output of the current task
  • Index – Stores the classification information of the data in the data file by storing partition offsets

The following screenshots shows example shuffle directories and shuffle files:

The following screenshot shows the aggregated metrics from the Spark UI:

The following are a few key highlights:

  • q80.sql, which had failed earlier after 1 hour and 25 minutes, and was able to complete only 13 out of 18 stages, finished successfully in about 2 hours and 53 minutes, completing all 18 stages.
  • We were able to shuffle 479.7 GB of data without worrying about storage limits.
  • Additional workers aren’t required to scale storage, which provides substantial cost savings.

Considerations and best practices

Keep in mind the following best practices when considering this solution:

  • This feature is recommended when you want to ensure the reliable execution of your data intensive workloads that create a large amount of shuffle or spill data. Writing and reading shuffle files from Amazon S3 is marginally slower when compared to local disk for our experiments with TPC-DS queries. S3 shuffle performance would be impacted by the number and size of shuffle files. For example, S3 could be slower for reads as compared to local storage if you have a large number of small shuffle files or partitions in your Spark application.
  • You can use this feature if your job frequently suffers from No space left on device issues.
  • You can use this feature if your job frequently suffers fetch failure issues (org.apache.spark.shuffle.MetadataFetchFailedException).
  • You can use this feature if your data is skewed.
  • We recommend setting the S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.glue.s3ShuffleBucket) in order to clean up old shuffle data.
  • At the time of writing this blog, this feature is currently available on AWS Glue 2.0 and Spark 2.4.3.

Conclusion

This post discussed how we can independently scale storage in AWS Glue without adding additional workers. With this feature, you can expect jobs that are processing terabytes of data to run much more reliably. Happy shuffling!


About the Authors

Anubhav Awasthi is a Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything to do with the data.

Mohit Saxena is a Software Engineering Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.

Now Available: Updated guidance on the Data Analytics Lens for AWS Well-Architected Framework

Post Syndicated from Wallace Printz original https://aws.amazon.com/blogs/big-data/now-available-updated-guidance-on-the-data-analytics-lens-for-aws-well-architected-framework/

Nearly all businesses today require some form of data analytics processing, from auditing user access to generating sales reports. For all your analytics needs, the Data Analytics Lens for AWS Well-Architected Framework provides prescriptive guidance to help you assess your workloads and identify best practices aligned to the AWS Well-Architected Pillars: Operational Excellence, Security, Reliability, Performance Efficiency, and Cost Optimization. Today, we’re pleased to announce a completely revised and updated version of the Data Analytics Lens whitepaper.

Self-assess with Well-Architected design principles

The updated version of the Data Analytics Lens whitepaper has been revised to provide guidance to CxOs as well as all data personas. Within each of the five Well-Architected Pillars, we provide top-level design principles for CxOs to quickly identify areas for teams and fundamental rules that analytics workloads designers should follow. Each design principle is followed by a series of questions and best practices that architects and system designers can use to perform self-assessments. Additionally, the Data Analytics Lens includes suggestions that prescriptively explain steps to implement best practices useful for implementation teams.

For example, the Security Pillar design principle “Control data access” works with the best practice to build user identity solutions that uniquely identify people and systems. The associated suggestion for this best practice is to centralize workforce identities, which details how to use this principle and includes links to more documentation on the suggestion.

“Building Data Analytics platform or workloads is one of the complex architecture patterns. It involves multi-layered approach such as Data Ingestion, Data Landing, Transformation Layer, Analytical/Insight and Reporting. Choices of technology and service for each of these layers are wide. The AWS Well-Architected Analytics Lens helps us to design and validate with great confidence against each of the pillars. Now Cognizant Architects can perform assessments using the Data Analytics Lens to validate and help build secure, scalable and innovative data solutions for customers.”

– Supriyo Chakraborty, Principal Architect & Head of Data Engineering Guild, Cognizant Germany
– Somasundaram Janavikulam, Cloud Enterprise Architect & Well Architected Partner Program Lead, Cognizant

In addition to performing your own assessment, AWS can provide a guided experience through reviewing your workload with a Well-Architected Framework Review engagement. For customers building data analytics workloads with AWS Professional Services, our teams of Data Architects can perform assessments using the Data Analytics Lens during the project engagements. This provides you with an objective assessment of your workloads and guidance on future improvements. The integration is available now for customers of the AWS Data Lake launch offering, with additional Data Analytics offerings coming in 2022. Reach out to your AWS Account Team if you’d like to know more about these guided Reviews.

Updated architectural patterns and scenarios

In this version of the Data Analytics Lens, we have also revised the discussion of data analytics patterns and scenarios to keep up with the industry and modern data analytics practices. Each scenario includes sections on characteristics that help you plan when developing systems for that scenario, a reference architecture to visualize and explain how the components work together, and configuration notes to help you properly configure your solution.

This version covers the following topics:

  • Building a modern data architecture (formerly Lake House Architecture)
  • Organize around data domains by delivering data as a product using a data mesh
  • Efficiently and securely provide batch data processing
  • Use streaming ingest and stream processing for real-time workloads
  • Build operational analytics systems to improve business processes and performance
  • Provide data visualization securely and cost-effectively at scale

Changed from the first release, the machine learning and tenant analytics scenarios have been migrated to a separate Machine Learning Lens whitepaper and SaaS Lens whitepaper.

Conclusion

We expect this updated version will provide better guidance to validate your existing architectures, as well as provide recommendations for any gaps that identified.

For more information about building your own Well-Architected systems using the Data Analytics Lens, see the Data Analytics Lens whitepaper.

Special thanks to everyone across the AWS Solution Architecture and Data Analytics communities who contributed. These contributions encompassed diverse perspectives, expertise, and experiences in developing the new AWS Well-Architected Data Analytics Lens.


About the Authors

Wallace Printz is a Senior Solutions Architect based in Austin, Texas. He helps customers across Texas transform their businesses in the cloud. He has a background in semiconductors, R&D, and machine learning.

Indira Balakrishnan is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

How Parametric Built Audit Surveillance using AWS Data Lake Architecture

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/how-parametric-built-audit-surveillance-using-aws-data-lake-architecture/

Parametric Portfolio Associates (Parametric), a wholly owned subsidiary of Morgan Stanley, is a registered investment adviser. Parametric provides investment advisory services to individual and institutional investors around the world. Parametric manages over 100,000 client portfolios with assets under management exceeding $400B (as of 9/30/21).

As a registered investment adviser, Parametric is subject to numerous regulatory requirements. The Parametric Compliance team conducts regular reviews on the firm’s portfolio management activities. To accomplish this, the organization needs both active and archived audit data to be readily available.

Parametric’s on-premises data lake solution was based on an MS-SQL server. They used an Apache Hadoop platform for their data storage, data management, and analytics. Significant gaps existed with the on-premises solution, which complicated audit processes. They were spending a large amount of effort on system maintenance, operational management, and software version upgrades. This required expensive consulting services and challenges with keeping the maintenance windows updated. This limited their agility, and also impacted their ability to derive more insights and value from their data. In an environment of rapid growth, adoption of more sophisticated analytics tools and processes has been slower to evolve.

In this blog post, we will show how Parametric implemented their Audit Surveillance Data Lake on AWS with purpose-built fully managed analytics services. With this solution, Parametric was able to respond to various audit requests within hours rather than days or weeks. This resulted in a system with a cost savings of 5x, with no data growth. Additionally, this new system can seamlessly support a 10x data growth.

Audit surveillance platform

The Parametric data management office (DMO) was previously running their data workloads using an on-premises data lake, which ran on the Hortonworks data platform of Apache Hadoop. This platform wasn’t up to date, and Parametric’s hardware was reaching end-of-life. Parametric was faced with a decision to either reinvest in their on-premises infrastructure or modernize their infrastructure using a modern data analytics platform on AWS. After doing a detailed cost/benefit analysis, the DMO calculated a 5x cost savings by using AWS. They decided to move forward and modernize with AWS due to these cost benefits, in addition to elasticity and security features.

The PPA compliance team asked the DMO to provide an enterprise data service to consume data from a data lake. This data was destined for downstream applications and ad-hoc data querying capabilities. It was accessed via standard JDBC tools and user-friendly business intelligence dashboards. The goal was to ensure that seven years of audit data would be readily available.

The DMO team worked with AWS to conceptualize an audit surveillance data platform architecture and help accelerate the implementation. They attended a series of AWS Immersion Days focusing on AWS fundamentals, Data Lakes, Devops, Amazon EMR, and serverless architectures. They later were involved in a four-day AWS Data Lab with AWS SMEs to create a data lake. The first use case in this Lab was creating the Audit Surveillance system on AWS.

Audit surveillance architecture on AWS

The following diagram shows the Audit Surveillance data lake architecture on AWS by using AWS purpose-built analytics services.

Figure 1. Audit Surveillance data lake architecture diagram

Figure 1. Audit Surveillance data lake architecture diagram

Architecture flow

  1. User personas: As first step, the DMO team identified three user personas for the Audit Surveillance system on AWS.
    • Data service compliance users who would like to consume audit surveillance data from the data lake into their respective applications through an enterprise data service.
    • Business users who would like to create business intelligence dashboards using a BI tool to audit data for compliance needs.
    • Complaince IT users who would like to perform ad-hoc queries on the data lake to perform analytics using an interactive query tool.
  2. Data ingestion: Data is ingested into Amazon Simple Storage Service (S3) from different on-premises data sources by using AWS Lake Formation blueprints. AWS Lake Formation provides workflows that define the data source and schedule to import data into the data lake. It is a container for AWS Glue crawlers, jobs, and triggers that are used to orchestrate the process to load and update the data lake.
  3. Data storage: Parametric used Amazon S3 as a data storage to build an Audit Surveillance data lake, as it has unmatched 11 nines of durability and 99.99% availability. The existing Hadoop storage was replaced with Amazon S3. The DMO team created a drop zone (raw), an analytics zone (transformed), and curated (enriched) storage layers for their data lake on AWS.
  4. Data cataloging: AWS Glue Data Catalog was the central catalog used to store and manage metadata for all datasets hosted in the Audit Surveillance data lake. The existing Hadoop metadata store was replaced with AWS Glue Data Catalog. AWS services such as AWS Glue, Amazon EMR, and Amazon Athena, natively integrate with AWS Glue Data Catalog.
  5. Data processing: Amazon EMR and AWS Glue process the raw data and places it into analytics zones (transformed) and curated zones (enriched) S3 buckets. Amazon EMR was used for big data processing and AWS Glue for standard ETL processes. AWS Lambda and AWS Step Functions were used to initiate monitoring and ETL processes.
  6. Data consumption: After Audit Surveillance data was transformed and enriched, the data was consumed by various personas within the firm as follows:
    • AWS Lambda and Amazon API Gateway were used to support consumption for data service compliance users.
    • Amazon QuickSight was used to create business intelligence dashboards for compliance business users.
    • Amazon Athena was used to query transformed and enriched data for compliance IT users.
  7. Security: AWS Key Management Service (KMS) customer managed keys were used for encryption at rest, and TLS for encryption at transition. Access to the encryption keys is controlled using AWS Identity and Access Management (IAM) and is monitored through detailed audit trails in AWS CloudTrail. Amazon CloudWatch was used for monitoring, and thresholds were created to determine when to send alerts.
  8. Governance: AWS IAM roles were attached to compliance users that permitted the administrator to grant access. This was only given to approved users or programs that went through authentication and authorization through AWS SSO. Access is logged and permissions can be granted or denied by the administrator. AWS Lake Formation is used for fine-grained access controls to grant/revoke permissions at the database, table, or column-level access.

Conclusion

The Parametric DMO team successfully replaced their on-premises Audit Surveillance Data Lake. They now have a modern, flexible, highly available, and scalable data platform on AWS, with purpose-built analytics services.

This change resulted in a 5x cost savings, and provides for a 10x data growth. There are now fast responses to internal and external audit requests (hours rather than days or weeks). This migration has given the company access to a wider breadth of AWS analytics services, which offers greater flexibility and options.

Maintaining the on-premises data lake would have required significant investment in both hardware upgrade costs and annual licensing and upgrade vendor consulting fees. Parametric’s decision to migrate their on-premises data lake has yielded proven cost benefits. And it has introduced new functions, service, and capabilities that were previously unavailable to Parametric DMO.

You may also achieve similar efficiencies and increase scalability by migrating on-premises data platforms into AWS. Read more and get started on building Data Lakes on AWS.

Accelerate large-scale data migration validation using PyDeequ

Post Syndicated from Mahendar Gajula original https://aws.amazon.com/blogs/big-data/accelerate-large-scale-data-migration-validation-using-pydeequ/

Many enterprises are migrating their on-premises data stores to the AWS Cloud. During data migration, a key requirement is to validate all the data that has been moved from on premises to the cloud. This data validation is a critical step and if not done correctly, may result in the failure of the entire project. However, developing custom solutions to determine migration accuracy by comparing the data between the source and target can often be time-consuming.

In this post, we walk through a step-by-step process to validate large datasets after migration using PyDeequ. PyDeequ is an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark.

Prerequisites

Before getting started, make sure you have the following prerequisites:

Solution overview

This solution uses the following services:

  • Amazon RDS for My SQL as the database engine for the source database.
  • Amazon Simple Storage Service (Amazon S3) or Hadoop Distributed File System (HDFS) as the target.
  • Amazon EMR to run the PySpark script. We use PyDeequ to validate data between MySQL and the corresponding Parquet files present in the target.
  • AWS Glue to catalog the technical table, which stores the result of the PyDeequ job.
  • Amazon Athena to query the output table to verify the results.

We use profilers, which is one of the metrics computation components of PyDeequ. We use this to analyze each column in the given dataset to calculate statistics like completeness, approximate distinct values, and data types.

The following diagram illustrates the solution architecture.

In this example, you have four tables in your on-premises database that you want to migrate: tbl_books, tbl_sales, tbl_venue, and tbl_category.

Deploy the solution

To make it easy for you to get started, we created an AWS CloudFormation template that automatically configures and deploys the solution for you.

The CloudFormation stack performs the following actions:

  • Launches and configures Amazon RDS for MySQL as a source database
  • Launches Secrets Manager for storing the credentials for accessing the source database
  • Launches an EMR cluster, creates and loads the database and tables on the source database, and imports the open-source library for PyDeequ at the EMR primary node
  • Runs the Spark ingestion process from Amazon EMR, connecting to the source database, and extracts data to Amazon S3 in Parquet format

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation template.

The template is launched in the US East (N. Virginia) Region by default.

  1. On the Select Template page, keep the default URL for the CloudFormation template, then choose Next.
  2. On the Specify Details page, provide values for the parameters that require input (see the following screenshot).
  3. Choose Next.
  4. Choose Next again.
  5. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Create stack.

You can view the stack outputs on the AWS Management Console or by using the following AWS Command Line Interface (AWS CLI) command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs

It takes approximately 20–30 minutes for the deployment to complete. When the stack is complete, you should see the resources in the following table launched and available in your account.

Resource Name Functionality
DQBlogBucket The S3 bucket that stores the migration accuracy results for the AWS Glue Data Catalog table
EMRCluster The EMR cluster to run the PyDeequ validation process
SecretRDSInstanceAttachment Secrets Manager for securely accessing the source database
SourceDBRds The source database (Amazon RDS)

When the EMR cluster is launched, it runs the following steps as part of the post-cluster launch:

  • DQsetupStep – Installs the Deequ JAR file and MySQL connector. This step also installs Boto3 and the PyDeequ library. It also downloads sample data files to use in the next step.
  • SparkDBLoad – Runs the initial data load to the MySQL database or table. This step creates the test environment that we use for data validation purposes. When this step is complete, we have four tables with data on MySQL and respective data files in Parquet format on HDFS in Amazon EMR.

When the Amazon EMR step SparkDBLoad is complete, we verify the data records in the source tables. You can connect to the source database using your preferred SQL editor. For more details, see Connecting to a DB instance running the MySQL database engine.

The following screenshot is a preview of sample data from the source table MyDatabase.tbl_books.

Validate data with PyDeequ

Now the test environment is ready and we can perform data validation using PyDeequ.

  1. Use Secure Shell (SSH) to connect to the primary node.
  2. Run the following Spark command, which performs data validation and persists the results to an AWS Glue Data Catalog table (db_deequ.db_migration_validation_result):
spark-submit --jars deequ-1.0.5.jar pydeequ_validation.py 

The script and JAR file are already available on your primary node if you used the CloudFormation template. The PySpark script computes PyDeequ metrics on the source MySQL table data and target Parquet files in Amazon S3. The metrics currently calculated as part of this example are as follows:

  • Completeness to measure fraction of not null values in a column
  • Approximate number of distinct values
  • Data type of column

If required, we can compute more metrics for each column. To see the complete list of supported metrics, see the PyDeequ package on GitHub.

The output metrics from the source and target are then compared using a PySpark DataFrame.

When that step is complete, the PySpark script creates the AWS Glue table db_deequ.db_migration_validation_result in your account, and you can query this table from Athena to verify the migration accuracy.

Verify data validation results with Athena

You can use Athena to check the overall data validation summary of all the tables. The following query shows you the aggregated data output. It lists all the tables you validated using PyDeequ and how many columns match between the source and target.

select table_name,
max(src_count) as "source rows",
max(tgt_count) as "target rows",
count(*) as "total columns",
sum(case when status='Match' then 1 else 0 end) as "matching columns",
sum(case when status<>'Match' then 1 else 0 end) as "non-matching columns"
from  "db_deequ"."db_migration_validation_result"
group by table_name;

The following screenshot shows our results.

Because all your columns match, you can have high confidence that the data has been exported correctly.

You can also check the data validation report for any table. The following query gives detailed information about any specific table metrics captured as part of PyDeequ validation:

select col_name,src_datatype,tgt_datatype,src_completeness,tgt_completeness,src_approx_distinct_values,tgt_approx_distinct_values,status 
from "db_deequ"."db_migration_validation_result"
where table_name='tbl_sales'

The following screenshot shows the query results. The last column status is the validation result for the columns in the table.

Clean up

To avoid incurring additional charges, complete the following steps to clean up your resources when you’re done with the solution:

  1. Delete the AWS Glue database and table db_deequ.db_migration_validation_result.
  2. Delete the prefixes and objects you created from the bucket dqblogbucket-${AWS::AccountId}.
  3. Delete the CloudFormation stack, which removes your additional resources.

Customize the solution

The solution consists of two parts:

  • Data extraction from the source database
  • Data validation using PyDeequ

In this section, we discuss ways to customize the solution based on your needs.

Data extraction from the source database

Depending on your data volume, there are multiple ways of extracting data from on-premises database sources to AWS. One recommended service is AWS Data Migration Service (AWS DMS). You can also use AWS Glue, Spark on Amazon EMR, and other services.

In this post, we use PySpark to connect to the source database using a JDBC connection and extract the data into HDFS using an EMR cluster.

The primary reason is that we’re already using Amazon EMR for PyDeequ, and we can use the same EMR cluster for data extraction.

In the CloudFormation template, the Amazon EMR step SparkDBLoad runs the PySpark script blogstep3.py. This PySpark script uses Secrets Manager and a Spark JDBC connection to extract data from the source to the target.

Data validation using PyDeequ

In this post, we use ColumnProfilerRunner from the pydeequ.profiles package for metrics computation. The source data is from the database using a JDBC connection, and the target data is from data files in HDFS and Amazon S3.

To create a DataFrame with metrics information for the source data, use the following code:

df_readsrc = spark.read.format('jdbc').option('url',sqlurl).option('dbtable',tbl_name).option('user',user).option('password',pwd).load()

result_rds = ColumnProfilerRunner(spark).onData(df_readsrc).run()
a=[]
for col, profile in result_rds.profiles.items():
    b=[]
    b.append(""+col +","+ str(profile.completeness)+","+ str(profile.approximateNumDistinctValues)+","+ str(profile.dataType)+"")
    a.append(b[0])

rdd1 = spark.sparkContext.parallelize(a)
row_rdd = rdd1.map(lambda x: Row(x))
df=spark.createDataFrame(row_rdd,['column'])

finalDataset = df.select(split(df.column,",")).rdd.flatMap(lambda x: x).toDF(schema=['column','completeness','approx_distinct_values','inferred_datatype'])

Similarly, the metrics is computed for the target (the data file).

You can create a temporary view from the DataFrame to use in the next step for metrics comparison.

After we have both the source (vw_Source) and target (vw_Target) available, we use the following query in Spark to generate the output result:

df_result = spark.sql("select t1.table_name as table_name,t1.column as col_name,t1.inferred_datatype as src_datatype,t2.inferred_datatype as tgt_datatype,t1.completeness as src_completeness,t2.completeness as tgt_completeness,t1.approx_distinct_values as src_approx_distinct_values,t2.approx_distinct_values as tgt_approx_distinct_values,t1.count as src_count,t2.count as tgt_count,case when t1.inferred_datatype=t2.inferred_datatype and t1.completeness=t2.completeness and t1.approx_distinct_values=t2.approx_distinct_values and t1.count=t2.count then 'Match' else 'No Match' end as status from vw_Source t1 left outer join vw_Target t2 on t1.column = t2.column")

The generated result is stored in the db_deequ.db_migration_validation_result table in the Data Catalog.

If you used the CloudFormation template, the entire PyDeequ code used in this post is available at the path /home/hadoop/pydeequ_validation.py in the EMR cluster.

You can modify the script to include or exclude tables as per your requirements.

Conclusion

This post showed you how you can use PyDeequ to accelerate the post-migration data validation process. PyDeequ helps you calculate metrics at the column level. You can also use more PyDeequ components like constraint verification to build a custom data validation framework.

For more use cases on Deequ, check out the following:


About the Authors

Mahendar Gajula is a Sr. Data Architect at AWS. He works with AWS customers in their journey to the cloud with a focus on Big data, Data Lakes, Data warehouse and AI/ML projects. In his spare time, he enjoys playing tennis and spending time with his family.

Nitin Srivastava is a Data & Analytics consultant at Amazon Web Services. He has more than a decade of datawarehouse experience along with designing and implementing large scale Big Data and Analytics solutions. He works with customers to deliver the next generation big data analytics platform using AWS technologies.

Stream data from relational databases to Amazon Redshift with upserts using AWS Glue streaming jobs

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/stream-data-from-relational-databases-to-amazon-redshift-with-upserts-using-aws-glue-streaming-jobs/

Traditionally, read replicas of relational databases are often used as a data source for non-online transactions of web applications such as reporting, business analysis, ad hoc queries, operational excellence, and customer services. Due to the exponential growth of data volume, it became common practice to replace such read replicas with data warehouses or data lakes to have better scalability and performance. In most real-world use cases, it’s important to replicate the data from a source relational database to the target in real time. Change data capture (CDC) is one of the most common design patterns to capture the changes made in the source database and relay them to other data stores.

AWS offers a broad selection of purpose-built databases for your needs. For analytic workloads such as reporting, business analysis, and ad hoc queries, Amazon Redshift is powerful option. With Amazon Redshift, you can query and combine exabytes of structured and semi-structured data across your data warehouse, operational database, and data lake using standard SQL.

To achieve CDC from Amazon Relational Database Service (Amazon RDS) or other relational databases to Amazon Redshift, the simplest solution is to create an AWS Database Migration Service (AWS DMS) task from the database to Amazon Redshift. This approach works well for simple data replication. To have more flexibility to denormalize, transform, and enrich the data, we recommend using Amazon Kinesis Data Streams and AWS Glue streaming jobs between AWS DMS tasks and Amazon Redshift. This post demonstrates how this second approach works in a customer scenario.

Example use case

For our example use case, we have a database that stores data of a fictional organization that holds sports events. We have three dimension tables: sport_event, ticket, and customer, and one fact table: ticket_activity. The table sport_event stores sport type (such as baseball or football), date, and location. The table ticket stores seat level, location, and ticket policy for the target sport event. The table customer stores individual customer names, email addresses, and phone numbers, which are sensitive information. When a customer buys a ticket, the activity (e.g. who purchased the ticket) is recorded in the table ticket_activity. One record is inserted into the table ticket_activity every time a customer buys a ticket, so new records are being ingested into this fact table continuously. The records ingested into the table ticket_activity are only updated when needed, when an administrator maintains the data.

We assume a persona, a data analyst, who is responsible for analyzing trends of the sports activity from this continuous data in real time. To use Amazon Redshift as a primary data mart, the data analyst needs to enrich and clean the data so that users like business analysts can understand and utilize the data easily.

The following are examples of the data in each table.

The following is the dimension table sport_event.

event_id sport_type start_date location
1 35 Baseball 9/1/2021 Seattle, US
2 36 Baseball 9/18/2021 New York, US
3 37 Football 10/5/2021 San Francisco, US

The following is the dimension table ticket (the field event_id is the foreign key for the field event_id in the table sport_event).

ticket_id event_id seat_level seat_location ticket_price
1 1315 35 Standard S-1 100
2 1316 36 Standard S-2 100
3 1317 37 Premium P-1 300

The following is the dimension table customer.

customer_id name email phone
1 222 Teresa Stein [email protected] +1-296-605-8486
2 223 Caleb Houston [email protected] 087-237-9316×2670
3 224 Raymond Turner [email protected] +1-786-503-2802×2357

The following is the fact table ticket_activity (the field purchased_by is the foreign key for the field customer_id in the table customer).

ticket_id purchased_by created_by updated_by
1 1315 222 8/15/2021 8/15/2021
2 1316 223 8/30/2021 8/30/2021
3 1317 224 8/31/2021 8/31/2021

To make the data easy to analyze, the data analyst wants to have only one table that includes all the information instead of joining all four tables every time they want to analyze. They also want to mask the field phone_number and tokenize the field email_address as sensitive information. To meet this requirement, we merge these four tables into one table and denormalize, tokenize, and mask the data.

The following is the destination table for analysis, sport_event_activity.

ticket_id event_id sport_type start_date location seat_level seat_location ticket_price purchased_by name email_address phone_number created_at updated_at
1 1315 35 Baseball 9/1/2021 Seattle, USA Standard S-1 100 222 Teresa Stein 990d081b6a420d04fbe07dc822918c7ec3506b12cd7318df7eb3af6a8e8e0fd6 +*-***-***-**** 8/15/2021 8/15/2021
2 1316 36 Baseball 9/18/2021 New York, USA Standard S-2 100 223 Caleb Houston c196e9e58d1b9978e76953ffe0ee3ce206bf4b88e26a71d810735f0a2eb6186e ***-***-****x**** 8/30/2021 8/30/2021
3 1317 37 Football 10/5/2021 San Francisco, US Premium P-1 300 224 Raymond Turner 885ff2b56effa0efa10afec064e1c27d1cce297d9199a9d5da48e39df9816668 +*-***-***-****x**** 8/31/2021 8/31/2021

Solution overview

The following diagram depicts the architecture of the solution that we deploy using AWS CloudFormation.

We use an AWS DMS task to capture the changes in the source RDS instance, Kinesis Data Streams as a destination of the AWS DMS task CDC replication, and an AWS Glue streaming job to read changed records from Kinesis Data Streams and perform an upsert into the Amazon Redshift cluster. In the AWS Glue streaming job, we enrich the sports-event records.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon RDS database instance (source).
  • An AWS DMS replication instance, used to replicate the table ticket_activity to Kinesis Data Streams.
  • A Kinesis data stream.
  • An Amazon Redshift cluster (destination).
  • An AWS Glue streaming job, which reads from Kinesis Data Streams and the RDS database instance, denormalizes, masks, and tokenizes the data, and upserts the records into the Amazon Redshift cluster.
  • Three AWS Glue Python shell jobs:
    • rds-ingest-data-initial-<CloudFormation Stack name> creates four source tables on Amazon RDS and ingests the initial data into the tables sport_event, ticket, and customer. Sample data is automatically generated at random by Faker library.
    • rds-ingest-data-incremental-<CloudFormation Stack name> ingests new ticket activity data into the source table ticket_activity on Amazon RDS continuously. This job simulates customer activity.
    • rds-upsert-data-<CloudFormation Stack name> upserts specific records in the source table ticket_activity on Amazon RDS. This job simulates administrator activity.
  • AWS Identity and Access Management (IAM) users and policies.
  • An Amazon VPC, a public subnet, two private subnets, an internet gateway, a NAT gateway, and route tables.
    • We use private subnets for the RDS database instance, AWS DMS replication instance, and Amazon Redshift cluster.
    • We use the NAT gateway to have reachability to pypi.org to use MySQL Connector for Python from the AWS Glue Python shell jobs. It also provides reachability to Kinesis Data Streams and an Amazon Simple Storage Service (Amazon S3) API endpoint.

The following diagram illustrates this architecture.

To set up these resources, you must have the following prerequisites:

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For S3BucketName, enter the name of your new S3 bucket.
  5. For VPCCIDR, enter the CIDR IP address range that doesn’t conflict with your existing networks.
  6. For PublicSubnetCIDR, enter the CIDR IP address range within the CIDR you gave in VPCCIDR.
  7. For PrivateSubnetACIDR and PrivateSubnetBCIDR, enter the CIDR IP address range within the CIDR you gave for VPCCIDR.
  8. For SubnetAzA and SubnetAzB, choose the subnets you want to use.
  9. For DatabaseUserName, enter your database user name.
  10. For DatabaseUserPassword, enter your database user password.
  11. Choose Next.
  12. On the next page, choose Next.
  13. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  14. Choose Create stack.

Stack creation can take about 20 minutes.

Ingest new records

In this section, we walk you through the steps to ingest new records.

Set up an initial source table

To set up an initial source table in Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Select the job rds-ingest-data-initial-<CloudFormation stack name>.
  3. On the Actions menu, choose Run job.
  4. Wait for the Run status to show as SUCCEEDED.

This AWS Glue job creates a source table event on the RDS database instance.

Start data ingestion to the source table on Amazon RDS

To start data ingestion to the source table on Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Triggers.
  2. Select the trigger periodical-trigger-<CloudFormation stack name>.
  3. On the Actions menu, choose Activate trigger.
  4. Choose Enable.

This trigger runs the job rds-ingest-data-incremental-<CloudFormation stack name> to ingest one record every minute.

Start data ingestion to Kinesis Data Streams

To start data ingestion from Amazon RDS to Kinesis Data Streams, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks.
  2. Select the task rds-to-kinesis-<CloudFormation stack name> .
  3. On the Actions menu, choose Restart/Resume.
  4. Wait for the Status to show as Load complete, replication ongoing.

The AWS DMS replication task ingests data from Amazon RDS to Kinesis Data Streams continuously.

Start data ingestion to Amazon Redshift

Next, to start data ingestion from Kinesis Data Streams to Amazon Redshift, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Select the job streaming-cdc-kinesis2redshift-<CloudFormation stack name>.
  3. On the Actions menu, choose Run job.
  4. Choose Run job again.

This AWS Glue streaming job is implemented based on the guidelines in Updating and inserting new data. It performs the following actions:

  • Creates a staging table on the Amazon Redshift cluster using the Amazon Redshift Data API
  • Reads from Kinesis Data Streams, and creates a DataFrame with filtering only INSERT and UPDATE records
  • Reads from three dimension tables on the RDS database instance
  • Denormalizes, masks, and tokenizes the data
  • Writes into a staging table on the Amazon Redshift cluster
  • Merges the staging table into the destination table
  • Drops the staging table

After about 2 minutes from starting the job, the data should be ingested into the Amazon Redshift cluster.

Validate the ingested data

To validate the ingested data in the Amazon Redshift cluster, complete the following steps:

  1. On the Amazon Redshift console, choose EDITOR in the navigation pane.
  2. Choose Connect to database.
  3. For Connection, choose Create a new connection.
  4. For Authentication, choose Temporary credentials.
  5. For Cluster, choose the Amazon Redshift cluster cdc-sample-<CloudFormation stack name>.
  6. For Database name, enter dev.
  7. For Database user, enter the user that was specified in the CloudFormation template (for example, dbmaster).
  8. Choose Connect.
  9. Enter the query SELECT * FROM sport_event_activity and choose Run.

Now you can see the ingested records in the table sport_event_activity on the Amazon Redshift cluster. Let’s note the value of ticket_id from one of the records. For this post, we choose 1317 as an example.

Update existing records

Your Amazon Redshift cluster now has the latest data ingested from the tables on the source RDS database instance. Let’s update the data in the source table ticket_activity on the RDS database instance to see that the updated records are replicated to the Amazon Redshift cluster side.

The CloudFormation template creates another AWS Glue job. This job upserts the data with specific IDs on the source table event. To upsert the records in the source table, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Choose the job rds-upsert-data-<CloudFormation stack name>.
  3. On the Actions menu, choose Edit job.
  4. Under Security configuration, script libraries, and job parameters (optional), for Job parameters, update the following parameters:
    1. For Key, enter --ticket_id_to_be_updated.
    2. For Value, replace 1 with one of the ticket IDs you observed on the Amazon Redshift console.
  5. Choose Save.
  6. Choose the job rds-upsert-data-<CloudFormation stack name>.
  7. On the Actions menu, choose Run job.
  8. Choose Run job.

This AWS Glue Python shell job simulates a customer activity to buy a ticket. It updates a record in the source table ticket_activity on the RDS database instance using the ticket ID passed in the job argument --ticket_id_to_be_updated. It automatically selects one customer, updates the field purchased_by with the customer ID, and updates the field updated_at with the current timestamp.

To validate the ingested data in the Amazon Redshift cluster, run the same query SELECT * FROM sport_event_activity. You can filter the record with the ticket_id value you noted earlier.

According to the rows returned to the query, the record ticket_id=1317 has been updated. The field updated_at has been updated from 2021-08-16 06:05:01 to 2021-08-16 06:53:52, and the field purchased_by has been updated from 449 to 14. From this result, you can see that this record has been successfully updated on the Amazon Redshift cluster side as well. You can also choose Queries in the left pane to see past query runs.

Clean up

Now to the final step, cleaning up the resources.

  1. Stop the AWS DMS replication task rds-to-kinesis-<CloudFormation stack name>.
  2. Stop the AWS Glue streaming job streaming-cdc-kinesis2redshift-<CloudFormation stack name>.
  3. Delete the CloudFormation stack.

Conclusion

In this post, we demonstrated how you can stream data—not only new records, but also updated records from relational databases—to Amazon Redshift. With this approach, you can easily achieve upsert use cases on Amazon Redshift clusters. In the AWS Glue streaming job, we demonstrated the common technique to denormalize, mask, and tokenize data for real-world use cases.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Roman Gavrilov is an Engineering Manager at AWS Glue. He has over a decade of experience building scalable Big Data and Event-Driven solutions. His team works on Glue Streaming ETL to allow near real time data preparation and enrichment for machine learning and analytics.