All posts by Vishal Pathak

Ingest streaming data to Apache Hudi tables using AWS Glue and Apache Hudi DeltaStreamer

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-to-apache-hudi-tables-using-aws-glue-and-apache-hudi-deltastreamer/

In today’s world with technology modernization, the need for near-real-time streaming use cases has increased exponentially. Many customers are continuously consuming data from different sources, including databases, applications, IoT devices, and sensors. Organizations may need to ingest that streaming data into data lakes built on Amazon Simple Storage Service (Amazon S3). You may also need to achieve analytics and machine learning (ML) use cases in near-real time. To ensure consistent results in those near-real-time streaming use cases, incremental data ingestion and atomicity, consistency, isolation, and durability (ACID) properties on data lakes have been a common ask.

To address such use cases, one approach is to use Apache Hudi and its DeltaStreamer utility. Apache Hudi is an open-source data management framework designed for data lakes. It simplifies incremental data processing by enabling ACID transactions and record-level inserts, updates, and deletes of streaming ingestion on data lakes built on top of Amazon S3. Hudi is integrated with well-known open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Presto, and Trino, as well as with various AWS analytics services like AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift. The DeltaStreamer utility provides an easy way to ingest streaming data from sources like Apache Kafka into your data lake.

This post describes how to run the DeltaStreamer utility on AWS Glue to read streaming data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) and ingest the data into S3 data lakes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. With AWS Glue, you can create Spark, Spark Streaming, and Python shell jobs to extract, transform, and load (ETL) data. You can create AWS Glue Spark streaming ETL jobs using either Scala or PySpark that run continuously, consuming data from Amazon MSK, Apache Kafka, and Amazon Kinesis Data Streams and writing it to your target.

Solution overview

To demonstrate the DeltaStreamer utility, we use fictional product data that represents product inventory including product name, category, quantity, and last updated timestamp. Let’s assume we stream the data from data sources to an MSK topic. Now we want to ingest this data coming from the MSK topic into Amazon S3 so that we can run Athena queries to analyze business trends in near-real time.

The following diagram provides the overall architecture of the solution described in this post.

To simulate application traffic, we use Amazon Elastic Compute Cloud (Amazon EC2) to send sample data to an MSK topic. Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kakfa to process streaming data. To consume the streaming data from Amazon MSK, we set up an AWS Glue streaming ETL job that uses the Apache Hudi Connector 0.10.1 for AWS Glue 3.0, with the DeltaStreamer utility to write the ingested data to Amazon S3. The Apache Hudi Connector 0.9.0 for AWS Glue 3.0 also supports the DeltaStreamer utility.

As the data is being ingested, the AWS Glue streaming job writes the data into the Amazon S3 base path. The data in Amazon S3 is cataloged using the AWS Glue Data Catalog. We then use Athena, which is an interactive query service, to query and analyze the data using standard SQL.

Prerequisites

We use an AWS CloudFormation template to provision some resources for our solution. The template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to ingest data to the MSK cluster running in a private subnet. Make sure you have a key in the AWS Region where you deploy the template. If you don’t have one, you can create a new key pair.

Create the Apache Hudi connection

To add the Apache Hudi Connector for AWS Glue, complete the following steps:

  1. On the AWS Glue Studio console, choose Connectors.
  2. Choose Go to AWS Marketplace.
  3. Search for and choose Apache Hudi Connector for AWS Glue.
  4. Choose Continue to Subscribe.
  5. Review the terms and conditions, then choose Accept Terms.

    After you accept the terms, it takes some time to process the request.
    When the subscription is complete, you see the Effective date populated next to the product.
  6. Choose Continue to Configuration.
  7. For Fulfillment option, choose Glue 3.0.
  8. For Software version, choose 0.10.1.
  9. Choose Continue to Launch.
  10. Choose Usage instructions, and then choose Activate the Glue connector from AWS Glue Studio.

    You’re redirected to AWS Glue Studio.
  11. For Name, enter Hudi-Glue-Connector.
  12. Choose Create connection and activate connector.

A message appears that the connection was successfully created. Verify that the connection is visible on the AWS Glue Studio console.

Launch the CloudFormation stack

For this post, we provide a CloudFormation template to create the following resources:

  • VPC, subnets, security groups, and VPC endpoints
  • AWS Identity and Access Management (IAM) roles and policies with required permissions
  • An EC2 instance running in a public subnet within the VPC with Kafka 2.12 installed and with the source data initial load and source data incremental load JSON files
  • An Amazon MSK server running in a private subnet within the VPC
  • An AWS Glue Streaming DeltaStreamer job to consume the incoming data from the Kafka topic and write it to Amazon S3
  • Two S3 buckets: one of the buckets stores code and config files, and other is the target for the AWS Glue streaming DeltaStreamer job

To create the resources, complete the following steps:

  1. Choose Launch Stack:
  2. For Stack name, enter hudi-deltastreamer-glue-blog.
  3. For ClientIPCIDR, enter the IP address of your client that you use to connect to the EC2 instance.
  4. For HudiConnectionName, enter the AWS Glue connection you created earlier (Hudi-Glue-Connector).
  5. For KeyName, choose the name of the EC2 key pair that you created as a prerequisite.
  6. For VpcCIDR, leave as is.
  7. Choose Next.
  8. Choose Next.
  9. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  10. Choose Create stack.

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the following information:

  • HudiDeltastreamerGlueJob – The AWS Glue streaming job name
  • MSKCluster – The MSK cluster ARN
  • PublicIPOfEC2InstanceForTunnel – The public IP of the EC2 instance for tunnel
  • TargetS3Bucket – The S3 bucket name

Create a topic in the MSK cluster

Next, SSH to Amazon EC2 using the key pair you created and run the following commands:

  1. SSH to the EC2 instance as ec2-user:
    ssh -i <KeyName> ec2-user@<PublicIPOfEC2InstanceForTunnel>

    You can get the KeyName value on the Parameters tab and the public IP of the EC2 instance for tunnel on the Outputs tab of the CloudFormation stack.

  2. For the next command, retrieve the bootstrap server endpoint of the MSK cluster by navigating to msk-source-cluster on the Amazon MSK console and choosing View client information.
  3. Run the following command to create the topic in the MSK cluster hudi-deltastream-demo:
    ./kafka_2.12-2.6.2/bin/kafka-topics.sh --create \
    --topic hudi-deltastream-demo \
    --bootstrap-server "<replace text with value under private endpoint on MSK>" \
    --partitions 1 \
    --replication-factor 2 \
    --command-config ./config_file.txt

  4. Ingest the initial data from the deltastreamer_initial_load.json file into the Kafka topic:
    ./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
    --broker-list "<replace text with value under private endpoint on MSK>" \
    --topic hudi-deltastream-demo \
    --producer.config ./config_file.txt < deltastreamer_initial_load.json

The following is the schema of a record ingested into the Kafka topic:

{
  "type":"record",
  "name":"products",
  "fields":[{
     "name": "id",
     "type": "int"
  }, {
     "name": "category",
     "type": "string"
  }, {
     "name": "ts",
     "type": "string"
  },{
     "name": "name",
     "type": "string"
  },{
     "name": "quantity",
     "type": "int"
  }
]}

The schema uses the following parameters:

  • id – The product ID
  • category – The product category
  • ts – The timestamp when the record was inserted or last updated
  • name – The product name
  • quantity – The available quantity of the product in the inventory

The following code gives an example of a record:

{
    "id": 1, 
    "category": "Apparel", 
    "ts": "2022-01-02 10:29:00", 
    "name": "ABC shirt", 
    "quantity": 4
}

Start the AWS Glue streaming job

To start the AWS Glue streaming job, complete the following steps:

  1. On the AWS Glue Studio console, find the job with the value for HudiDeltastreamerGlueJob.
  2. Choose the job to review the script and job details.
  3. On the Job details tab, replace the value of the --KAFKA_BOOTSTRAP_SERVERS key with the Amazon MSK bootstrap server’s private endpoint.
  4. Choose Save to save the job settings.
  5. Choose Run to start the job.

When the AWS Glue streaming job runs, the records from the MSK topic are consumed and written to the target S3 bucket created by AWS CloudFormation. To find the bucket name, check the stack’s Outputs tab for the TargetS3Bucket key value.

The data in Amazon S3 is stored in Parquet file format. In this example, the data written to Amazon S3 isn’t partitioned, but you can enable partitioning by specifying hoodie.datasource.write.partitionpath.field=<column_name> as the partition field and setting hoodie.datasource.write.hive_style_partitioning to True in the Hudi configuration property in the AWS Glue job script.

In this post, we write the data to a non-partitioned table, so we set the following two Hudi configurations:

  • hoodie.datasource.hive_sync.partition_extractor_class is set to org.apache.hudi.hive.NonPartitionedExtractor
  • hoodie.datasource.write.keygenerator.class is set to org.apache.hudi.keygen.NonpartitionedKeyGenerator

DeltaStreamer options and configuration

DeltaStreamer has multiple options available; the following are the options set in the AWS Glue streaming job used in this post:

  • continuous – DeltaStreamer runs in continuous mode running source-fetch.
  • enable-hive-sync – Enables table sync to the Apache Hive Metastore.
  • schemaprovider-class – Defines the class for the schema provider to attach schemas to the input and target table data.
  • source-class – Defines the source class to read data and has many built-in options.
  • source-ordering-field – The field used to break ties between records with the same key in input data. Defaults to ts (the Unix timestamp of record).
  • target-base-path – Defines the path for the target Hudi table.
  • table-type – Indicates the Hudi storage type to use. In this post, it’s set to COPY_ON_WRITE.

The following are some of the important DeltaStreamer configuration properties set in the AWS Glue streaming job:

# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc

# Kafka Source
hoodie.deltastreamer.source.kafka.topic=hudi-deltastream-demo

#Kafka props
bootstrap.servers=args("KAFKA_BOOTSTRAP_SERVERS")
auto.offset.reset=earliest
security.protocol=SSL

The configuration contains the following details:

  • hoodie.deltastreamer.schemaprovider.source.schema.file – The schema of the source record
  • hoodie.deltastreamer.schemaprovider.target.schema.file – The schema for the target record.
  • hoodie.deltastreamer.source.kafka.topic – The source MSK topic name
  • bootstap.servers – The Amazon MSK bootstrap server’s private endpoint
  • auto.offset.reset – The consumer’s behavior when there is no committed position or when an offset is out of range

Hudi configuration

The following are some of the important Hudi configuration options, which enable us to achieve in-place updates for the generated schema:

  • hoodie.datasource.write.recordkey.field – The record key field. This is the unique identifier of a record in Hudi.
  • hoodie.datasource.write.precombine.field – When two records have the same record key value, Apache Hudi picks the one with the largest value for the pre-combined field.
  • hoodie.datasource.write.operation – The operation on the Hudi dataset. Possible values include UPSERT, INSERT, and BULK_INSERT.

AWS Glue Data Catalog table

The AWS Glue job creates a Hudi table in the Data Catalog mapped to the Hudi dataset on Amazon S3. Because the hoodie.datasource.hive_sync.table configuration parameter is set to product_table, the table is visible under the default database in the Data Catalog.

The following screenshot shows the Hudi table column names in the Data Catalog.

Query the data using Athena

With the Hudi datasets available in Amazon S3, you can query the data using Athena. Let’s use the following query:

SELECT * FROM "default"."product_table";

The following screenshot shows the query output. The table product_table has four records from the initial ingestion: two records for the category Apparel, one for Cosmetics, and one for Footwear.

Load incremental data into the Kafka topic

Now suppose that the store sold some quantity of apparel and footwear and added a new product to its inventory, as shown in the following code. The store sold two items of product ID 1 (Apparel) and one item of product ID 3 (Footwear). The store also added the Cosmetics category, with product ID 5.

{"id": 1, "category": "Apparel", "ts": "2022-01-02 10:45:00", "name": "ABC shirt", "quantity": 2}
{"id": 3, "category": "Footwear", "ts": "2022-01-02 10:50:00", "name": "DEF shoes", "quantity": 5}
{"id": 5, "category": "Cosmetics", "ts": "2022-01-02 10:55:00", "name": "JKL Lip gloss", "quantity": 7}

Let’s ingest the incremental data from the deltastreamer_incr_load.json file to the Kafka topic and query the data from Athena:

./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
--broker-list "<replace text with value under private endpoint on MSK>" \
--topic hudi-deltastream-demo \
--producer.config ./config_file.txt < deltastreamer_incr_load.json

Within a few seconds, you should see a new Parquet file created in the target S3 bucket under the product_table prefix. The following is the screenshot from Athena after the incremental data ingestion showing the latest updates.

Additional considerations

There are some hard-coded Hudi options in the AWS Glue Streaming job scripts. These options are set for the sample table that we created for this post, so update the options based on your workload.

Clean up

To avoid any incurring future charges, delete the CloudFormation stack, which deletes all the underlying resources created by this post, except for the product_table table created in the default database. Manually delete the product_table table under the default database from the Data Catalog.

Conclusion

In this post, we illustrated how you can add the Apache Hudi Connector for AWS Glue and perform streaming ingestion into an S3 data lake using Apache Hudi DeltaStreamer with AWS Glue. You can use the Apache Hudi Connector for AWS Glue to create a serverless streaming pipeline using AWS Glue streaming jobs with the DeltaStreamer utility to ingest data from Kafka. We demonstrated this by reading the latest updated data using Athena in near-real time.

As always, AWS welcomes feedback. If you have any comments or questions on this post, please share them in the comments.


About the authors

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey in AWS, Vishal helped customers implement business intelligence, data warehouse, and data lake projects in the US and Australia.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Anand Prakash is a Senior Solutions Architect at AWS Data Lab. Anand focuses on helping customers design and build AI/ML, data analytics, and database solutions to accelerate their path to production.

Writing to Apache Hudi tables using AWS Glue Custom Connector

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/writing-to-apache-hudi-tables-using-aws-glue-connector/

In today’s world, most organizations have to tackle the 3 V’s of variety, volume and velocity of big data. In this blog post, we talk about dealing with the variety and volume aspects of big data. The challenge of dealing with the variety involves processing data from various SQL and NoSQL systems. This variety can include data from rdbms sources such as Amazon Aurora or NoSQL sources such as Amazon DynamoDB or 3rd party APIs.

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. In order to enable customers process data from a variety of sources, the AWS Glue team has introuduced AWS Glue Custom Connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. This new feature is over and above the AWS Glue Connections feature in the AWS Glue service.

In this post, we simplify the process to create Hudi tables with AWS Glue Custom Connector. The jar wrapped by the first version of AWS Glue Custom Connector is based on Apache Hudi 0.5.3. Instructions on creating the JAR file are in the previous post of this series.

Whereas the first post focused on creating an end-to-end architecture for replicating the data in a rdbms source to Lakehouse, this post focuses on volume aspect of big data. In this post, we create a Hudi table with an initial load of over 200 million records and then update 70 million of those records. The connector not only writes the data to Amazon Simple Storage Service (Amazon S3), but also creates the tables in the AWS Glue Data Catalog. If you’re creating a partitioned Hudi table, the connector also creates the partitions in the Data Catalog. We discuss the code for creating a partitioned Hudi table in the previous post in this series.

We use the Copy On Write storage type, which gives better read performance compared to Merge On Read. For more information about Hudi storage types, see Hudi Dataset Storage Types and Storage Types & Views.

Note that this post focuses on using the AWS Glue Custom Connector to write to Apache Hudi tables. Please implement other best practices such as encryption and network security while implementing the architecture for your workloads.

Creating the Apache Hudi connection using AWS Glue Custom Connector

To create your AWS Glue job with an AWS Glue Custom Connector, complete the following steps:

  1. Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
    Go to the AWS Glue Studio Console, search for AWS Glue Connector for Apache Hudi and choose AWS Glue Connector for Apache Hudi link.
  2. Choose Continue to Subscribe.
    Choose Continue to Subscribe
  3. Review the Terms and Conditions and choose the Accept Terms button to continue.Review the Terms and Conditions and choose the Accept Terms button to continue.
  4. Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
    Make sure that the subscription is complete and you see the Effective date populated next to the product and then choose Continue to Configuration button.
  5. As of writing this blog, 0.5.3 is the latest version of the AWS Glue Connector for Apache Hudi. Make sure that 0.5.3 (Nov 19, 2020) is selected in the Software Version dropdown and Activate in AWS Glue Studio is selected in the Delivery Method dropdown. Choose Continue to Launch button.
    5. Choose Continue to Launch button.
  6. Under Launch this software, choose Usage Instructions and then choose Activate the Glue connector for Apache Hudi in AWS Glue Studio.
    6. Activate the Glue connector for Apache Hudi in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, hudi-connection).
  2. For Description, enter a description.
    8. For Description, enter a description.
  3. Choose Create connection and activate connector.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Configuring resources and permissions

For this post, we provide an AWS CloudFormation template to create the following resources:

  • Two AWS Glue jobs: hudi-init-load-job and hudi-upsert-job
  • An S3 bucket to store the Python scripts for these jobs
  • An S3 bucket to store the output files of these jobs
  • An AWS Lambda function to copy the scripts from the public S3 bucket to your account
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions

Launch the following stack, providing your connection name, created in Step 9 of the previous section, for the HudiConnectionName parameter:

Launch the following stack, providing your connection name for the HudiConnectionName parameter:

Please check I acknowledge that AWS CloudFormation might create IAM resources with custom names check box before clicking the Create Stack button.

If you have AWS Lake Formation enabled in the Region in which you’re implementing this solution, make sure that you give HudiConnectorExecuteGlueHudiJobRole Create table permission in the default database. HudiConnectorExecuteGlueHudiJobRole is created by the CloudFormation stack that you created above.

Create table permission in the default database.

HudiConnectorExecuteGlueHudiJobRole should also have Create Database permission. You can grant this permission in Database creators section under Admins and database creators tab.

You can grant this permission in Database creators section under Admins and database creators tab.

Running the load job

You’re now ready to run the first of your two jobs. 

  1. On the AWS Glue console, select the job hudi-init-load-job.
  2. On the Action menu, choose Run job.
    On the Action menu, choose Run job.

My job finished in less than 10 minutes. The job inserted over 204 million records into the Hudi table.

The job inserted over 204 million records into the Hudi table.

Although rest of the code is standard Hudi PySpark code, I want to call out the last line of the code to show how easy it is to write to Hudi tables using AWS Glue:

glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(inputDf, glueContext, "inputDf"), connection_type = "marketplace.spark", connection_options = combinedConf)

In the preceding code, combinedConf is a Python dictionary that includes all your Apache Hudi configurations. You can download the HudiInitLoadNYTaxiData.py script to use.

Querying the data

The ny_yellow_trip_data table is now visible in the default database, and you can query it through Athena.

If you have Lake Formation enabled in this Region, the role or user querying the table should have Select permissions on the table.

You can now run the following query:

select count(*) cnt, vendorid from default.ny_yellow_trip_data group by vendorid

The following screenshot shows our output.

The following screenshot shows our output.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

If you have Lake Formation enabled in this Region, make sure that you give Drop permission to HudiConnectorExecuteLambdaFnsRole so the CloudFormation template can drop the default.ny_yellow_trip_data table when you delete the stack.

Running the upsert job

You can now run your second job, hudi-upsert-job. This job reads the newly written data and updates the vendor IDs of all the records that have vendorid=1. The new vendor ID for these records (over 78 million) is set as 9. You can download the HudiUpsertNYTaxiData.py script to use.

This job also finished in under 10 minutes.

This job also finished in under 10 minutes.

Querying the updated data

You can now query the updated Hudi table in Athena. The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

The following screenshot shows that the vendor ID of over 78 million records has been changed to 9.

Additional considerations

The AWS Glue Connector for Apache Hudi has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the AWS Glue job scripts. These options are set for the sample table that we create for this post. Update the options based on your workload.

Conclusion

In this post, we created an Apache Hudi table with AWS Glue Custom Connector and AWS Glue 2.0 jobs. We read over 200 million records from a public S3 bucket and created an Apache Hudi table using it. We then updated over 70 million of these records. With the new AWS Glue Custom Connector feature, we can now directly write an AWS Glue DynamicFrame to an Apache Hudi table.

Note that you can also use Glue jobs to write to Apache Hudi MoR tables. Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift talks about the process in detail. While it uses jars as an external dependency, you can now use the AWS Glue Connector for Apache Hudi for the same operation. The post uses HudiJob.py to write to MoR tables and then uses HudiMoRCompactionJob.scala to compact the MoR tables. Note that HudiMoRCompactionJob.scala has also been implemented using Glue jobs and hence you can use AWS Glue for compaction job too.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/creating-a-source-to-lakehouse-data-replication-pipe-using-apache-hudi-aws-glue-aws-dms-and-amazon-redshift/

Most customers have their applications backed by various sql and nosql systems on prem and on cloud. Since the data is in various independent systems, customers struggle to derive meaningful info by combining data from all of these sources. Hence, customers create data lakes to bring their data in a single place.

Typically, a replication tool such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3). When the data is in Amazon S3, customers process it based on their requirements. A typical requirement is to sync the data in Amazon S3 with the updates on the source systems. Although it’s easy to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s tough to apply this change data capture (CDC) process on your data lakes. Apache Hudi is a good way to solve this problem. Currently, you can use Hudi on Amazon EMR to create Hudi tables.

In this post, we use Apache Hudi to create tables in the AWS Glue Data Catalog using AWS Glue jobs. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. This post enables you to take advantage of the serverless architecture of AWS Glue while upserting data in your data lake, hassle-free.

To write to Hudi tables using AWS Glue jobs, we use a JAR file created using open-source Apache Hudi. This JAR file is used as a dependency in the AWS Glue jobs created through the AWS CloudFormation template provided in this post. Steps to create the JAR file are included in the appendix.

The following diagram illustrates the architecture the CloudFormation template implements.

Prerequisites

The CloudFormation template requires you to select an Amazon Elastic Compute Cloud (Amazon EC2) key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to get to the Aurora cluster that lives in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

Solution overview

The following are the high-level implementation steps:

  1. Create a CloudFormation stack using the provided template.
  2. Connect to the Amazon Aurora cluster used as a source for this post.
  3. Run InitLoad_TestStep1.sql, in the source Amazon Aurora cluster, to create a schema and a table.

AWS DMS replicates the data from the Aurora cluster to the raw S3 bucket. AWS DMS supports a variety of sources.
The CloudFormation stack creates an AWS Glue job (HudiJob) that is scheduled to run at a frequency set in the ScheduleToRunGlueJob parameter of the CloudFormation stack. This job reads the data from the raw S3 bucket, writes to the Curated S3 bucket, and creates a Hudi table in the Data Catalog. The job also creates an Amazon Redshift external schema in the Amazon Redshift cluster created by the CloudFormation stack.

  1. You can now query the Hudi table in Amazon Athena or Amazon Redshift. Visit Creating external tables for data managed in Apache Hudi or Considerations and Limitations to query Apache Hudi datasets in Amazon Athena for details.
  2. Run IncrementalUpdatesAndInserts_TestStep2.sql on the source Aurora cluster.

This incremental data is also replicated to the raw S3 bucket through AWS DMS. HudiJob picks up the incremental data, using AWS Glue bookmarks, and applies it to the Hudi table created earlier.

  1. You can now query the changed data.

Creating your CloudFormation stack

Click on the Launch Stack button to get started and provide the following parameters:

Parameter Description
VpcCIDR CIDR range for the VPC.
PrivateSubnet1CIDR CIDR range for the first private subnet.
PrivateSubnet2CIDR CIDR range for the second private subnet.
PublicSubnetCIDR CIDR range for the public subnet.
AuroraDBMasterUserPassword Primary user password for the Aurora cluster.
RedshiftDWMasterUserPassword Primary user password for the Amazon Redshift data warehouse.
KeyName The EC2 key pair to be configured in the EC2 instance on the public subnet. This EC2 instance is used to get to the Aurora cluster in the private subnet. Select the value from the dropdown.
ClientIPCIDR Your IP address in CIDR notation. The CloudFormation template creates a security group rule that grants ingress on port 22 to this IP address. On a Mac, you can run the following command to get your IP address: curl ipecho.net/plain ; echo /32
EC2ImageId The image ID used to create the EC2 instance in the public subnet to be a jump box to connect to the source Aurora cluster. If you supply your image ID, the template uses it to create the EC2 instance.
HudiStorageType This is used by the AWS Glue job to determine if you want to create a CoW or MoR storage type table. Enter MoR if you want to create MoR storage type tables.
ScheduleToRunGlueJob The AWS Glue job runs on a schedule to pick the new files and load to the curated bucket. This parameter sets the schedule of the job.
DMSBatchUnloadIntervalInSecs AWS DMS batches the inputs from the source and loads the output to the taw bucket. This parameter defines the frequency in which the data is loaded to the raw bucket.
GlueJobDPUs The number of DPUs that are assigned to the two AWS Glue jobs.

To simplify running the template, your account is given permissions on the key used to encrypt the resources in the CloudFormation template. You can restrict that to the role if desired.

Granting Lake Formation permissions

AWS Lake Formation enables customers to set up fine grained access control for their Datalake. Detail steps to set up AWS Lake Formation can be found here.

Setting up AWS Lake Formation is out of scope for this post. However, if you have Lake Formation configured in the Region where you’re deploying this template, grant Create database permission to the LakeHouseExecuteGlueHudiJobRole role after the CloudFormation stack is successfully created.

This will ensure that you don’t get the following error while running your AWS Glue job.

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insufficient Lake Formation permission(s) on global_temp

Similarly grant Describe permission to the LakeHouseExecuteGlueHudiJobRole role on default database.

This will ensure that you don’t get the following error while running your AWS Glue job.

AnalysisException: 'java.lang.RuntimeException: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException;

Connecting to source Aurora cluster

To connect to source Aurora cluster using SQL Workbench, complete the following steps:

  1. On SQL Workbench, under File, choose Connect window.

  1. Choose Manage Drivers.

  1. Choose PostgreSQL.
  2. For Library, use the driver JAR file.
  3. For Classname, enter org.postgresql.Driver.
  4. For Sample URL, enter jdbc:postgresql://host:port/name_of_database.

  1. Click the Create a new connection profile button.
  2. For Driver, choose your new PostgreSQL driver.
  3. For URL, enter lakehouse_source_db after port/.
  4. For Username, enter postgres.
  5. For Password, enter the same password that you used for the AuroraDBMasterUserPassword parameter while creating the CloudFormation stack.
  6. Choose SSH.
  7. On the Outputs tab of your CloudFormation stack, copy the IP address next to PublicIPOfEC2InstanceForTunnel and enter it for SSH hostname.
  8. For SSH port, enter 22.
  9. For Username, enter ec2-user.
  10. For Private key file, enter the private key for the public key chosen in the KeyName parameter of the CloudFormation stack.
  11. For Local port, enter any available local port number.
  12. On the Outputs tab of your stack, copy the value next to EndpointOfAuroraCluster and enter it for DB hostname.
  13. For DB port, enter 5432.
  14. Select Rewrite JDBC URL.


Checking the Rewrite JDBC URL checkbox will automatically feed in the value of host and port in the URL text box as shown below.

  1. Test the connection and make sure that you get a message that the connection was successful.

 

Troubleshooting

Complete the following steps if you receive this message: Could not initialize SSH tunnel: java.net.ConnectException: Operation timed out (Connection timed out)

  1. Go to your CloudFormation stack and search for LakeHouseSecurityGroup under Resources .
  2. Choose the link in the Physical ID.

  1. Select your security group.
  2. From the Actions menu, choose Edit inbound rules.

  1. Look for the rule with the description:Rule to allow connection from the SQL client to the EC2 instance used as jump box for SSH tunnel
  2. From the Source menu, choose My IP.
  3. Choose Save rules.

  1. Test the connection from your SQL Workbench again and make sure that you get a successful message.

Running the initial load script

You’re now ready to run the InitLoad_TestStep1.sql script to create some test data.

  1. Open InitLoad_TestStep1.sql in your SQL client and run it.

The output shows that 11 statements have been run.

AWS DMS replicates these inserts to your raw S3 bucket at the frequency set in the DMSBatchUnloadIntervalInSecs parameter of your CloudFormation stack.

  1. On the AWS DMS console, choose the lakehouse-aurora-src-to-raw-s3-tgt task:
  2. On the Table statistics tab, you should see the seven full load rows of employee_details have been replicated.

The lakehouse-aurora-src-to-raw-s3-tgt replication task has the following table mapping with transformation to add a schema name and a table name as additional columns:

{
   "rules":[
      {
         "rule-type":"selection",
         "rule-id":"1",
         "rule-name":"1",
         "object-locator":{
            "schema-name":"human_resources",
            "table-name":"%"
         },
         "rule-action":"include",
         "filters":[
            
         ]
      },
      {
         "rule-type":"transformation",
         "rule-id":"2",
         "rule-name":"2",
         "rule-target":"column",
         "object-locator":{
            "schema-name":"%",
            "table-name":"%"
         },
         "rule-action":"add-column",
         "value":"schema_name",
         "expression":"$SCHEMA_NAME_VAR",
         "data-type":{
            "type":"string",
            "length":50
         }
      },
      {
         "rule-type":"transformation",
         "rule-id":"3",
         "rule-name":"3",
         "rule-target":"column",
         "object-locator":{
            "schema-name":"%",
            "table-name":"%"
         },
         "rule-action":"add-column",
         "value":"table_name",
         "expression":"$TABLE_NAME_VAR",
         "data-type":{
            "type":"string",
            "length":50
         }
      }
   ]
}

These settings put the name of the source schema and table as two additional columns in the output Parquet file of AWS DMS.
These columns are used in the AWS Glue HudiJob to find out the tables that have new inserts, updates, or deletes.

  1. On the Resources tab of the CloudFormation stack, locate RawS3Bucket.
  2. Choose the Physical ID link.

  1. Navigate to human_resources/employee_details.

The LOAD00000001.parquet file is created under human_resources/employee_details. (The name of your raw bucket is different from the following screenshot).

You can also see the time of creation of this file. You should have at least one successful run of the AWS Glue job (HudiJob) after this time for the Hudi table to be created. The AWS Glue job is configured to load this data into the curated bucket at the frequency set in the ScheduleToRunGlueJob parameter of your CloudFormation stack. The default is 5 minutes.

AWS Glue job HudiJob

The following code is the script for HudiJob:

import sys
import os
import json

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import concat, col, lit, to_timestamp

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

import boto3
from botocore.exceptions import ClientError

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

logger.info('Initialization.')
glueClient = boto3.client('glue')
ssmClient = boto3.client('ssm')
redshiftDataClient = boto3.client('redshift-data')

logger.info('Fetching configuration.')
region = os.environ['AWS_DEFAULT_REGION']

curatedS3BucketName = ssmClient.get_parameter(Name='lakehouse-curated-s3-bucket-name')['Parameter']['Value']
rawS3BucketName = ssmClient.get_parameter(Name='lakehouse-raw-s3-bucket-name')['Parameter']['Value']
hudiStorageType = ssmClient.get_parameter(Name='lakehouse-hudi-storage-type')['Parameter']['Value']

dropColumnList = ['db','table_name','Op']

logger.info('Getting list of schema.tables that have changed.')
changeTableListDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://'+rawS3BucketName], 'groupFiles': 'inPartition', 'recurse':True}, format = 'parquet', format_options={}, transformation_ctx = 'changeTableListDyf')

logger.info('Processing starts.')
if(changeTableListDyf.count() > 0):
    logger.info('Got new files to process.')
    changeTableList = changeTableListDyf.toDF().select('schema_name','table_name').distinct().rdd.map(lambda row : row.asDict()).collect()

    for dbName in set([d['schema_name'] for d in changeTableList]):
        spark.sql('CREATE DATABASE IF NOT EXISTS ' + dbName)
        redshiftDataClient.execute_statement(ClusterIdentifier='lakehouse-redshift-cluster', Database='lakehouse_dw', DbUser='rs_admin', Sql='CREATE EXTERNAL SCHEMA IF NOT EXISTS ' + dbName + ' FROM DATA CATALOG DATABASE \'' + dbName + '\' REGION \'' + region + '\' IAM_ROLE \'' + boto3.client('iam').get_role(RoleName='LakeHouseRedshiftGlueAccessRole')['Role']['Arn'] + '\'')

    for i in changeTableList:
        logger.info('Looping for ' + i['schema_name'] + '.' + i['table_name'])
        dbName = i['schema_name']
        tableNameCatalogCheck = ''
        tableName = i['table_name']
        if(hudiStorageType == 'MoR'):
            tableNameCatalogCheck = i['table_name'] + '_ro' #Assumption is that if _ro table exists then _rt table will also exist. Hence we are checking only for _ro.
        else:
            tableNameCatalogCheck = i['table_name'] #The default config in the CF template is CoW. So assumption is that if the user hasn't explicitly requested to create MoR storage type table then we will create CoW tables. Again, if the user overwrites the config with any value other than 'MoR' we will create CoW storage type tables.
        isTableExists = False
        isPrimaryKey = False
        isPartitionKey = False
        primaryKey = ''
        partitionKey = ''
        try:
            glueClient.get_table(DatabaseName=dbName,Name=tableNameCatalogCheck)
            isTableExists = True
            logger.info(dbName + '.' + tableNameCatalogCheck + ' exists.')
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                isTableExists = False
                logger.info(dbName + '.' + tableNameCatalogCheck + ' does not exist. Table will be created.')
        try:
            table_config = json.loads(ssmClient.get_parameter(Name='lakehouse-table-' + dbName + '.' + tableName)['Parameter']['Value'])
            try:
                primaryKey = table_config['primaryKey']
                isPrimaryKey = True
                logger.info('Primary key:' + primaryKey)
            except KeyError as e:
                isPrimaryKey = False
                logger.info('Primary key not found. An append only glueparquet table will be created.')
            try:
                partitionKey = table_config['partitionKey']
                isPartitionKey = True
                logger.info('Partition key:' + partitionKey)
            except KeyError as e:
                isPartitionKey = False
                logger.info('Partition key not found. Partitions will not be created.')
        except ClientError as e:    
            if e.response['Error']['Code'] == 'ParameterNotFound':
                isPrimaryKey = False
                isPartitionKey = False
                logger.info('Config for ' + dbName + '.' + tableName + ' not found in parameter store. Non partitioned append only table will be created.')

        inputDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://' + rawS3BucketName + '/' + dbName + '/' + tableName], 'groupFiles': 'none', 'recurse':True}, format = 'parquet',transformation_ctx = tableName)
        
        inputDf = inputDyf.toDF().withColumn('update_ts_dms',to_timestamp(col('update_ts_dms')))
        
        targetPath = 's3://' + curatedS3BucketName + '/' + dbName + '/' + tableName

        morConfig = {'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', 'hoodie.compact.inline': 'false', 'hoodie.compact.inline.max.delta.commits': 20, 'hoodie.parquet.small.file.limit': 0}

        commonConfig = {'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName, 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': dbName, 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.enable': 'true'}

        partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
                     
        unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
        
        incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 20, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
        
        initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 3, 'hoodie.datasource.write.operation': 'bulk_insert'}
        
        deleteDataConfig = {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}

        if(hudiStorageType == 'MoR'):
            commonConfig = {**commonConfig, **morConfig}
            logger.info('MoR config appended to commonConfig.')
        
        combinedConf = {}

        if(isPrimaryKey):
            logger.info('Going the Hudi way.')
            if(isTableExists):
                logger.info('Incremental load.')
                outputDf = inputDf.filter("Op != 'D'").drop(*dropColumnList)
                if outputDf.count() > 0:
                    logger.info('Upserting data.')
                    if (isPartitionKey):
                        logger.info('Writing to partitioned Hudi table.')
                        outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    else:
                        logger.info('Writing to unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                outputDf_deleted = inputDf.filter("Op = 'D'").drop(*dropColumnList)
                if outputDf_deleted.count() > 0:
                    logger.info('Some data got deleted.')
                    if (isPartitionKey):
                        logger.info('Deleting from partitioned Hudi table.')
                        outputDf_deleted = outputDf_deleted.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig, **deleteDataConfig}
                        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    else:
                        logger.info('Deleting from unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig, **deleteDataConfig}
                        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
            else:
                outputDf = inputDf.drop(*dropColumnList)
                if outputDf.count() > 0:
                    logger.info('Inital load.')
                    if (isPartitionKey):
                        logger.info('Writing to partitioned Hudi table.')
                        outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **initLoadConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
                    else:
                        logger.info('Writing to unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **initLoadConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
        else:
            if (isPartitionKey):
                logger.info('Writing to partitioned glueparquet table.')
                sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE', partitionKeys=[partitionKey])
            else:
                logger.info('Writing to unpartitioned glueparquet table.')
                sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE')
            sink.setFormat('glueparquet')
            sink.setCatalogInfo(catalogDatabase = dbName, catalogTableName = tableName)
            outputDyf = DynamicFrame.fromDF(inputDf.drop(*dropColumnList), glueContext, 'outputDyf')
            sink.writeFrame(outputDyf)

job.commit()

Hudi tables need a primary key to perform upserts. Hudi tables can also be partitioned based on a certain key. We get the names of the primary key and the partition key from AWS Systems Manager Parameter Store.

The HudiJob script looks for an AWS Systems Manager Parameter with the naming format lakehouse-table-<schema_name>.<table_name>. It compares the name of the parameter with the name of the schema and table columns, added by AWS DMS, to get the primary key and the partition key for the Hudi table.

The CloudFormation template creates lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, as shown on the Resources tab.

If you choose the Physical ID link, you can locate the value of the AWS Systems Manager Parameter. The AWS Systems Manager Parameter has {"primaryKey": "emp_no", "partitionKey": "department"} value in it.

Because of the value in the lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, the AWS Glue script creates a human_resources.employee_details Hudi table partitioned on the department column for the employee_details table created in the source using the InitLoad_TestStep1.sql script. The HudiJob also uses the emp_no column as the primary key for upserts.

If you reuse this CloudFormation template and create your own table, you have to create an associated AWS Systems Manager Parameter with the naming convention lakehouse-table-<schema_name>.<table_name>. Keep in mind the following:

  • If you don’t create a parameter, the script creates an unpartitioned glueparquet append-only table.
  • If you create a parameter that only has the primaryKey part in the value, the script creates an unpartitioned Hudi table.
  • If you create a parameter that only has the partitionKey part in the value, the script creates a partitioned glueparquet append-only table.

If you have too many tables to replicate, you can also store the primary key and partition key configuration in Amazon DynamoDB or Amazon S3 and change the code accordingly.

In the InitLoad_TestStep1.sql script, replica identity for human_resources.employee_details table is set to full. This makes sure that AWS DMS transfers the full delete record to Amazon S3. Having this delete record is important for the HudiJob script to delete the record from the Hudi table. A full delete record from AWS DMS for the human_resources.employee_details table looks like the following:

{ "Op": "D", "update_ts_dms": "2020-10-25 07:57:48.589284", "emp_no": 3, "name": "Jeff", "department": "Finance", "city": "Tokyo", "salary": 55000, "schema_name": "human_resources", "table_name": "employee_details"}

The schema_name, and table_name columns are added by AWS DMS because of the task configuration shared previously.update_ts_dms has been set as the value for TimestampColumnName S3 setting in AWS DMS S3 Endpoint.Op is added by AWS DMS for cdc and it indicates source DB operations in migrated S3 data.

We also set spark.serializer in the script. This setting is required for Hudi.

In HudiJob script, you can also find a few Python dict that store various Hudi configuration properties. These configurations are just for demo purposes; you have to adjust them based on your workload. For more information about Hudi configurations, see Configurations.

HudiJob is scheduled to run every 5 minutes by default. The frequency is set by the ScheduleToRunGlueJob parameter of the CloudFormation template. Make sure that you successfully run HudiJob at least one time after the source data lands in the raw S3 bucket. The screenshot in Step 6 of Running the initial load script section confirms that AWS DMS put the LOAD00000001.parquet file in the raw bucket at 11:54:41 AM and following screenshot confirms that the job execution started at 11:55 AM.

The job creates a Hudi table in the AWS Glue Data Catalog (see the following screenshot). The table is partitioned on the department column.

Granting AWS Lake Formation permissions

If you have AWS Lake Formation enabled, make sure that you grant Select permission on the human_resources.employee_details table to the role/user used to run Athena query. Similarly, you also have to grant Select permission on the human_resources.employee_details table to the LakeHouseRedshiftGlueAccessRole role so you can query human_resources.employee_details in Amazon Redshift.

Grant Drop permission on the human_resources database to LakeHouseExecuteLambdaFnsRole so that the template can delete the database when you delete the template. Also, the CloudFormation template does not roll back any AWS Lake Formation grants or changes that are manually applied.

Granting access to KMS key

The curated S3 bucket is encrypted by lakehouse-key, which is an AWS Key Management Service (AWS KMS) customer managed key created by AWS CloudFormation template.

To run the query in Athena, you have to add the ARN of the role/user used to run the Athena query in the Allow use of the key section in the key policy.

This will ensure that you don’t get com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; error while running your Athena query.

You might not have to execute the above KMS policy change if you have kept the default of granting access to the AWS account and the role/user used to run Athena query has the necessary KMS related policies attached to it.

Confirming job completion

When HudiJob is complete, you can see the files in the curated bucket.

  1. On the Resources tab, search for CuratedS3Bucket.
  2. Choose the Physical ID link.

The following screenshot shows the timestamp on the initial load.

  1. Navigate to the department=Finance prefix and select the Parquet file.
  2. Choose Select from.
  1. For File format, select Parquet.
  2. Choose Show file preview.

You can see the value of the timestamp in the update_ts_dms column.

Querying the Hudi table

You can now query your data in Amazon Athena or Amazon Redshift.

Querying in Amazon Athena

Query the human_resources.employee_details table in Amazon Athena with the following code:

SELECT emp_no,
         name,
         city,
         salary,
         department,
         from_unixtime(update_ts_dms/1000000,'America/Los_Angeles') update_ts_dms_LA,
         from_unixtime(update_ts_dms/1000000,'UTC') update_ts_dms_UTC         
FROM "human_resources"."employee_details"
ORDER BY emp_no

The timestamp for all the records matches the timestamp in the update_ts_dms column in the earlier screenshot.

Querying in Redshift Spectrum

Read query your table in Redshift Spectrum for Apache Hudi support in Amazon Redshift.

  1. On the Amazon Redshift console, locate lakehouse-redshift-cluster.
  2. Choose Query cluster.

  1. For Database name, enter lakehouse_dw.
  2. For Database user, enter rs_admin.
  3. For Database password, enter the password that you used for the RedshiftDWMasterUserPassword parameter in the CloudFormation template.

  1. Enter the following query for the human_resources.employee_details table:
    SELECT emp_no,
             name,
             city,
             salary,
             department,
             (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' AT TIME ZONE 'america/los_angeles' update_ts_dms_LA,
             (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' update_ts_dms_UTC
    FROM human_resources.employee_details
    ORDER BY emp_no 

The following screenshot shows the query output.

Running the incremental load script

We now run the IncrementalUpdatesAndInserts_TestStep2.sql script. The output shows that 6 statements were run.

AWS DMS now shows that it has replicated the new incremental changes. The changes are replicated at a frequency set in DMSBatchUnloadIntervalInSecs parameter of the CloudFormation stack.

This creates another Parquet file in the raw S3 bucket.

The incremental updates are loaded into the Hudi table according to the chosen frequency to run the job (the ScheduleToRunGlueJob parameter). The HudiJobscript uses job bookmarks to find out the incremental load so it only processes the new files brought in through AWS DMS.

Confirming job completion

Make sure that HudiJob runs successfully at least one time after the incremental file arrives in the raw bucket. The previous screenshot shows that the incremental file arrived in the raw bucket at 1:18:38 PM and the following screenshot shows that the job started at 1:20 PM.

Querying the changed data

You can now check the table in Athena and Amazon Redshift. Both results show that emp_no 3 is deleted, 8 and 9 have been added, and 2 and 5 have been updated.

The following screenshot shows the results in Athena.

The following screenshot shows the results in Redshift Spectrum.

AWS Glue Job HudiMoRCompactionJob

The CloudFormation template also deploys the AWS Glue job HudiMoRCompactionJob. This job is not scheduled; you only use it if you choose the MoR storage type. To execute the pipe for MoR storage type instead of CoW storage type, delete the CloudFormation stack and create it again. After creation, replace CoW in lakehouse-hudi-storage-type AWS Systems Manager Parameter with MoR.

If you use MoR storage type, the incremental updates are stored in log files. You can’t see the updates in the _ro (read optimized) view, but can see them in the _rt view. Amazon Athena documentation and Amazon Redshift documentation gives more details about support and considerations for Apache Hudi.

To see the incremental data in the _ro view, run the HudiMoRCompactionJob job. For more information about Hudi storage types and views, see Hudi Dataset Storage Types and Storage Types & Views. The following code is an example of the CLI command used to run HudiMoRCompactionJob job:

aws glue start-job-run --job-name HudiMoRCompactionJob --arguments="--DB_NAME=human_resources","--TABLE_NAME=employee_details","--IS_PARTITIONED=true"

You can decide on the frequency of running this job. You don’t have to run the job immediately after the HudiJob. You should run this job when you want the data to be available in the _ro view. You have to pass the schema name and the table name to this script so it knows the table to compact.

Additional considerations

The JAR file we use in this post has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the HudiJob script. These options are set for the sample table that we create for this post. Update the options based on your workload. 

Conclusion

In this post, we created AWS Glue 2.0 jobs that moved the source upserts and deletes into Hudi tables. The code creates tables in the AWS GLue Data Catalog and updates partitions so you don’t have to run the crawlers to update them.

This post simplified your LakeHouse code base by giving you the benefits of Apache Hudi along with serverless AWS Glue. We also showed how to create an source to LakeHouse replication system using AWS Glue, AWS DMS, and Amazon Redshift with minimum overhead.


Appendix

We can write to Hudi tables because of the hudi-spark.jar file that we downloaded to our DependentJarsAndTempS3Bucket S3 bucket with the CloudFormation template. The path to this file is added as a dependency in both the AWS Glue jobs. This file is based on open-source Hudi. To create the JAR file, complete the following steps:

  1. Get Hudi 0.5.3 and unzip it using the following code:
    wget https://github.com/apache/hudi/archive/release-0.5.3.zip
    unzip hudi-release-0.5.3.zip

  2. Edit Hudi pom.xml:
    vi hudi-release-0.5.3/pom.xml

    1. Remove the following code to make the build process faster:
      <module>packaging/hudi-hadoop-mr-bundle</module>
      <module>packaging/hudi-hive-bundle</module>
      <module>packaging/hudi-presto-bundle</module>
      <module>packaging/hudi-utilities-bundle</module>
      <module>packaging/hudi-timeline-server-bundle</module>
      <module>docker/hoodie/hadoop</module>
      <module>hudi-integ-test</module>

    2. Change the versions of all three dependencies of httpcomponents to 4.4.1. The following is the original code:
      <!-- Httpcomponents -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>fluent-hc</artifactId>
              <version>4.3.2</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpcore</artifactId>
              <version>4.3.2</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.3.6</version>
            </dependency>

      The following is the replacement code:

      <!-- Httpcomponents -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>fluent-hc</artifactId>
              <version>4.4.1</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpcore</artifactId>
              <version>4.4.1</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.4.1</version>
            </dependency>

  3. Build the JAR file:
    mvn clean package -DskipTests -DskipITs -f <Full path of the hudi-release-0.5.3 dir>

  4. You can now get the JAR from the following location:
hudi-release-0.5.3/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.5.3-rc2.jar

The other JAR dependency used in the AWS Glue jobs is spark-avro_2.11-2.4.4.jar.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Developing AWS Glue ETL jobs locally using a container

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In the fourth post of the series, we discussed optimizing memory management. In this post, we focus on writing ETL scripts for AWS Glue jobs locally. AWS Glue is built on top of Apache Spark and therefore uses all the strengths of open-source technologies. AWS Glue comes with many improvements on top of Apache Spark and has its own ETL libraries that can fast-track the development process and reduce boilerplate code.

The AWS Glue team released the AWS Glue binaries and let you set up an environment on your desktop to test your code. We have used these libraries to create an image with all the right dependencies packaged together. The image has AWS Glue 1.0, Apache Spark, OpenJDK, Maven, Python3, the AWS Command Line Interface (AWS CLI), and boto3. We have also bundled Jupyter and Zeppelin notebook servers in the image so you don’t have to configure an IDE and can start developing AWS Glue code right away.

The AWS Glue team will release new images for various AWS Glue updates. The tags of the new images will follow the following convention: glue_libs_<glue-version>_image_<image-version>. For example, glue_libs_1.0.0_image_01. In this name, 1.0 is the AWS Glue major version, .0 is the patch version, and 01 is the image version. The patch version will be incremented for updates to the AWS Glue libraries of a major release. Image version will be incremented for the release of a new image of a major AWS Glue release. Both these increments will be reset with every major AWS Glue release. So, the first image released for AWS Glue 2.0 will be glue_libs_2.0.0_image_01.

We recommend pulling the highest image version for an AWS Glue major version to get the latest updates.

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. The machine running the Docker hosts the AWS Glue container. Also make sure that you have at least 7 GB of disk space for the image on the host running the Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Solution overview

In this post, we use amazon/aws-glue-libs:glue_libs_1.0.0_image_01 from Docker Hub. This image has only been tested for an AWS Glue 1.0 Spark shell (both for PySpark and Scala). It hasn’t been tested for an AWS Glue 1.0 Python shell.

We organize this post into the following three sections. You only have to complete one of the three sections (not all three) depending on your requirement:

  • Setting up the container to use Jupyter or Zeppelin notebooks
  • Setting up the Docker image with PyCharm Professional
  • Running against the CLI interpreter

This post uses the following two terms frequently:

  • Client – The system from which you access the notebook. You open a web browser on this system and put the notebook URL.
  • Host – The system that hosts the Docker daemon. The container runs on this system.

Sometimes, your client and host can be the same system.

Setting up the container to use Jupyter or Zeppelin notebooks

Setting up the container to run PySpark code in a notebook includes three high-level steps:

  1. Pulling the image from Docker Hub.
  2. Running the container.
  3. Opening the notebook.

Pulling the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Open cmd on Windows or terminal on Mac and run the following command:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

Running the container

We pulled the image from Docker Hub in the previous step. We now run a container using this image.

The general format of the run command is:

docker run -itd -p <port_on_host>:<port_on_container_either_8888_or_8080> -p 4040:4040 <credential_setup_to_access_AWS_resources> --name <container_name> amazon/aws-glue-libs:glue_libs_1.0.0_image_01 <command_to_start_notebook_server>

The code includes the following information:

  • <port_on_host> – The local port of your host that is mapped to the port of the container. For our use case, the container port is either 8888 (for a Jupyter notebook) or 8080 (for a Zeppelin notebook). To keep things simple, we use the same port number as the notebook server ports on the container in the following examples.
  • <port_on_container_either_8888_or_8080> – The port of the notebook server on the container. The default port of Jupyter is 8888; the default port of Zeppelin is 8080.
  • 4040:4040 – This is required for SparkUI. 4040 is the default port for SparkUI. For more information, see Web Interfaces.
  • <credential_setup_to_access_AWS_resources> – In this section, we go with the typical case of mounting the host’s directory, containing the credentials. We assume that your host has the credentials configured using aws configure. The flow chart in the Appendix section explains various ways to set the credentials if the assumption doesn’t hold for your environment.
  • <container_name> – The name of the container. You can use any text here.

  • amazon/aws-glue-libs:glue_libs_1.0.0_image_01 – The name of the image that we pulled in the previous step.
  • <command_to_start_notebook_server> – We run /home/zeppelin/bin/zeppelin.sh for a Zeppelin notebook and /home/jupyter/jupyter_start.sh for a Jupyter notebook. If you want to run your code against the CLI interpreter, you don’t need a notebook server and can leave this argument blank.
The following example code starts a Jupyter notebook and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
The following example code starts a Jupyter notebook and passes read-write credentials from a Windows host:

docker run -itd -p 8888:8888 -p 4040:4040 -v %UserProfile%\.aws:/root/.aws:rw --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

To run a Zeppelin notebook, replace 8888:8888 with 8080:8080, glue_jupyter with glue_zeppelin, and /home/jupyter/jupyter_start.sh with /home/zeppelin/bin/zeppelin.sh. For example, the following command starts a Zeppelin notebook server and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8080:8080 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_zeppelin amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/zeppelin/bin/zeppelin.sh

You can now run the following command to make sure that the container is running:

docker ps

The Jupyter notebook is configured to allow connections from all IP addresses without authentication, and the Zeppelin notebook is configured to use anonymous access. This configuration makes sure that you can start working on your local machine with just two commands (docker pull and docker run). If your scenario mandates a different configuration, run the container without running the notebook startup script (/home/jupyter/jupyter_start.sh or /home/zeppelin/bin/zeppelin.sh). This starts the container but not the notebook server. You can then run the bash shell on the container using the following command, edit the required notebook configurations, and start the notebook server:

docker exec -it <container_name> bash

For example,

docker exec -it glue_jupyter bash.

The following example code is the docker run command without the notebook server startup:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01

If you’re running the container on Amazon Elastic Compute Cloud (Amazon EC2) instance, you have to set up your inbound rules in the security group to allow communication on the ports used by the notebook server. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

Opening the notebook

If your client and host are the same machine, enter the following URL for Jupyter: http://localhost:8888.

You can write PySpark code in the notebook as shown here. You can also use SQL magic (%%sql) to directly write SQL against the tables in the AWS Glue Data Catalog. If your catalog table is on top of JSON data, you have to place json-serde.jar in the /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/jars directory of the container and restart the kernel in your Jupyter notebook. You can place the jar in this directory by first running the bash shell on the container using the following command:

docker exec -it <container_name> bash

If you have a local directory that holds your notebooks, you can mount it to /home/jupyter/jupyter_default_dir using the -v option. These notebooks are available to you when you open the Jupyter notebook URL. For example, see the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro -v C:\Users\admin\Documents\notebooks:/home/jupyter/jupyter_default_dir --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

The URL for Zeppelin is http://localhost:8080.

For Zeppelin notebooks, include %spark.pyspark on the top to run PySpark code.

If your host is Amazon EC2 and your client is your laptop, replace localhost in the preceding URLs with your host’s public IP.

Depending on your network or if you’re on a VPN, you might have to set an SSH tunnel. The general format of the tunnel is the following code:

ssh -i <absolute_path_to_your_private_key_for_EC2> -v -N -L <port_on_client>:<ip_of_the_container>:<port_8888_or_8080> ec2-user@<public_ip_address_of_ec2_host>

Your security group controlling the EC2 instance should allow inbound on port 22 from the client. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

You can get the <ip_of_the_container> under the IPAddress field when you run docker inspect <container_name>. For example: docker inspect glue_jupyter.

If you set up the tunnel, the URL to access the notebook is: http://localhost:<port_on_client>.

Use 8888 or 8080 for <port_8888_or_8080>, depending on if you’re running a Jupyter or Zeppelin notebook.

You can now use the following sample code to test your notebook:

from pyspark import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate()) 
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
inputDF.toDF().show()

Although awsglue-datasets is a public bucket, you at least need the following permissions, attached to the AWS Identity and Access Management (IAM) user used for your container, to view the data:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3ReadOnly",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::awsglue-datasets/*"
        }
    ]
}

You can also see the databases in your AWS Glue Data Catalog using the following code:

spark.sql("show databases").show()

You need AWS Glue permissions to run the preceding command. The following are the minimum permissions required to run the code. Replace <account_number> with your account number and <region> with your Region:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GlueAccess",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account_number>:database/*",
                "arn:aws:glue:<region>:<account_number>:catalog"
            ]
        }
    ]
}

Similarly, you can query the AWS Glue Data Catalog tables too. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

You can stop here if you want to develop AWS Glue code locally using only notebooks.

Setting up the Docker image with PyCharm Professional

This section talks about setting up PyCharm Professional to use the image. For this post, we use Windows. There may be a few differences when using PyCharm on a Mac.

  1. Open cmd (or terminal for Mac) and pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01 using the following command:
    docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

    If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

  2. Choose the Docker icon (right-click) and choose Settings (this step isn’t required for Mac or Linux).
  3. In the General section, select Expose daemon on tcp://localhost:2375 without TLS (this step isn’t required for Mac or Linux). Note the warning listed under the checkbox. This step is based on PyCharm documentation.
  4. Choose Apply & Restart (this step isn’t required for Mac or Linux).
  5. Choose the Docker icon (right-click) and choose Restart… if the Docker doesn’t restart automatically (this step isn’t required for Mac or Linux).
  6. Open PyCharm and create a Pure Python project (if you don’t have one).
  7. Under File, choose Settings… (for Mac, under PyCharm, choose Preferences).
  8. Under Settings, choose Project Interpreter. In the following screenshot, GlueProject is the name of my project. Your project name might be different.
  9. Choose Show All… from the drop-down menu.
  10. Choose the + icon.

  11. Choose Docker.
  12. Choose New.
  13. For Name, enter a name (for example, Docker-Glue).
  14. Keep other settings at their default.
  15. If running on Windows, for Connect to Docker daemon with, select TCP socket and enter the Engine API URL.
    For this post, we enter tcp://localhost:2375 because Docker and PyCharm are on the same Windows machine.
    If running on a Mac, select Docker for Mac. No API URL is required.
  16. Make sure you see the message Connection successful.

For Windows, if you don’t see this message, Docker may not have restarted after you changed the settings in Step 4. Restart the Docker and repeat these steps again. For more information about connection settings, see PyCharm documentation.

The following screenshots show steps 13-16 in Windows and Mac.

  1. Choose OK.

You should now see the image listed in the drop-down menu.

  1. Choose the image that you pulled from Docker Hub (amazon/aws-glue-libs:glue_libs_1.0.0_image_01).
  2. Choose OK.

You now see the interpreter listed.

  1. Choose OK.

This lists all the packages in the image.

  1. Choose OK.

Steps 22-27 help you get AWS Glue-related code completion suggestions from PyCharm.

  1. Download the following file: https://s3.amazonaws.com/aws-glue-jes-prod-us-east-1-assets/etl-1.0/python/PyGlue.zip.
  2. Under File, choose Settings (for Mac, under PyCharm, choose Preferences).
  3. Under Project: <Project name>, choose Project Structure.
  4. Choose Add Content Root.
  5. Choose the newly downloaded PyGlue.zip file.
  6. In the Settings window, choose OK.
  7. Choose the project (right-click) and choose New, Python File.
  8. Enter a name for the Python file and press Enter.
  9. Enter the following code in the file and save it. For more information about the minimum permissions required to run this code, see this section.
    from pyspark import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate()) 
    inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
    inputDF.toDF().show()
    

  10. Choose Add Configuration.
  11. Choose the +icon.
  12. Under Add New Configuration, choose Python.
  13. For Name, enter a name.
  14. For Environment variables, enter the following:
    PYTHONPATH=/home/aws-glue-libs/awsglue.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/pyspark.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python

  15. For Script path, select the newly created script in Step 29.
  16. For Python interpreter, choose the newly created interpreter.
  17. Choose Docker Container Settings.
  18. Under Volume bindings, choose the +icon.
  19. For Host path, add the absolute path .aws folder that holds the credentials and the config files.
  20. For Container path, add /root/.aws.
  21. Choose OK.
  22. For Run/Debug Configurations, choose OK.
  23. Run the code by choosing the green button on the top right.

You can also see the databases in your AWS Glue Data Catalog using the following code. For more information about the minimum permissions required to run this code, see this section.

spark.sql("show databases").show()

Similarly, you can also query the catalog tables. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

PyCharm gives code completion suggestions for AWS Glue (see the following screenshot). This is possible because of the steps you completed earlier.

Running against the CLI interpreter

You can always run the bash shell on the container and run your PySpark code directly against the CLI interpreter in the container.

  1. Complete Pulling the image from Docker Hub step and Running the container step in the section Setting up the container to use Jupyter of Zeppelin notebooks.
  2. Run the bash shell on the container by entering the following code. Replace <container_name> with the name (--name argument) you used earlier.
    docker exec -it <container_name> bash

  3. Run one of the following commands:
    1. For PySpark, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

    2. For Scala, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/spark-shell

Conclusion

In this post, we learned about a three-step process to get started on AWS Glue and Jupyter or Zeppelin notebook. Although notebooks are a great way to get started and a great asset to data scientists and data wranglers, data engineers generally have a source control repository, an IDE, and a well-defined CI/CD process. Because PyCharm is a widely used IDE for PySpark development, we showed how to use the image with PyCharm Professional. You can develop your code locally in your IDE and test it locally using the container, and your CI/CD process can run as it does with any other IDE and source control tool in your organization. Although we showed integration with PyCharm, you can similarly integrate the container with any IDE that you use to complete your CI/CD story with AWS Glue.


Appendix

The following section discusses various ways to set the credentials to access AWS resources (such as Amazon Simple Storage Service (Amazon S3), AWS Step Functions, and more) from the container.

You need to provide your AWS credentials to connect to an AWS service from the container. The AWS SDKs and CLIs use provider chains to look for AWS credentials in several different places, including system or user environment variables and in local AWS configuration files. For more information about how to set up credentials, see https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/credentials.html. To generate the credentials using the AWS Management Console, see Managing Access Keys (Console). For instructions on generating credentials with the AWS CLI, see create-access-key. For more information about generating credentials with an API, see CreateAccessKey.

The following flow chart shows the various ways to set up AWS credentials for the container. Most of these mechanisms don’t work with PyCharm because we use the image there and not the container. You can use the container as an SSH interpreter in PyCharm and then use one of the credential setting mechanisms listed here. However, that discussion is out of the scope of this post.

Note that the numbers, in brackets, match the code snippets that follow the chart.

(1) To find more info about the syntax of setting up the tunnel, see this.

(2) To set credentials using the docker cp command to copy credentials from the Windows host to the container, enter the following code (this example code uses the container name glue_jupyter):

docker cp %UserProfile%\.aws\.  glue_jupyter:/root/.aws

(3) To mount the host’s .aws directory on the container with rw option, see this.

(4) To mount the host’s .aws directory on the container with ro option, see this.

(5) To set the credentials in a file, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --env-file /datalab_pocs/glue_local/env_variables.txt --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

/datalab_pocs/glue_local/env_variables.txt is the absolute path of the file holding the environment variables. The file should have the following variables:

  • AWS_ACCESS_KEY_ID=<Access_id>
  • AWS_SECRET_ACCESS_KEY=<Access_key>
  • AWS_REGION=<Region>

For more information about Regions, see Regions, Availability Zones, and Local Zones.

(6) To set the credentials in the docker run command, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -e AWS_ACCESS_KEY_ID=<ID> -e AWS_SECRET_ACCESS_KEY=<Key> -e AWS_REGION=<Region>  --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

(7) To set credentials using aws configure on the container, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
docker exec -it glue_jupyter bash
aws configure


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.