All posts by Avinash Desireddy

Build end-to-end Apache Spark pipelines with Amazon MWAA, Batch Processing Gateway, and Amazon EMR on EKS clusters

Post Syndicated from Avinash Desireddy original https://aws.amazon.com/blogs/big-data/build-end-to-end-apache-spark-pipelines-with-amazon-mwaa-batch-processing-gateway-and-amazon-emr-on-eks-clusters/

Apache Spark workloads running on Amazon EMR on EKS form the foundation of many modern data platforms. EMR on EKS offers benefits by providing managed Spark that integrates seamlessly with other AWS services and your organization’s existing Kubernetes-based deployment patterns.

Data platforms processing large-scale data volumes often require multiple EMR on EKS clusters. In the post Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments, we introduced Batch Processing Gateway (BPG) as a solution for managing Spark workloads across these clusters. Although BPG provides foundational functionality to distribute workloads and support routing for Spark jobs in multi-cluster environments, enterprise data platforms require additional features for a comprehensive data processing pipeline.

This post shows how to enhance the multi-cluster solution by integrating Amazon Managed Workflows for Apache Airflow (Amazon MWAA) with BPG. By using Amazon MWAA, we add job scheduling and orchestration capabilities, enabling you to build a comprehensive end-to-end Spark-based data processing pipeline.

Overview of solution

Consider HealthTech Analytics, a healthcare analytics company managing two distinct data processing workloads. Their Clinical Insights Data Science team processes sensitive patient outcome data requiring HIPAA compliance and dedicated resources, and their Digital Analytics team handles website interaction data with more flexible requirements. As their operation grows, they face increasing challenges in managing these diverse workloads efficiently.

The company needs to maintain strict separation between protected health information (PHI) and non-PHI data processing, while also addressing different cost center requirements. The Clinical Insights Data Science team runs critical end-of-day batch processes that need guaranteed resources, whereas the Digital Analytics team can use cost-optimized spot instances for their variable workloads. Additionally, data scientists from both teams require environments for experimentation and prototyping as needed.

This scenario presents an ideal use case for implementing a data pipeline using Amazon MWAA, BPG, and multiple EMR on EKS clusters. The solution needs to route different Spark workloads to appropriate clusters based on security requirements and cost profiles, while maintaining the necessary isolation and compliance controls. To effectively manage such an environment, we need a solution that maintains clean separation between application and infrastructure management concerns and stitching together multiple components into a robust pipeline.

Our solution consists of integrating Amazon MWAA with BPG through an Airflow custom operator for BPG called BPGOperator. This operator encapsulates the infrastructure management logic needed to interact with BPG. BPGOperator provides a clean interface for job submission through Amazon MWAA. When executed, the operator communicates with BPG, which then routes the Spark workloads to available EMR on EKS clusters based on predefined routing rules.

The following architecture diagram illustrates the components and their interactions.

Image showing the end to end architecture for end-to-end pipeline

The solution works through the following steps:

  • Amazon MWAA executes scheduled DAGs using BPGOperator. Data engineers create DAGs using this operator, requiring only the Spark application configuration file and basic scheduling parameters.
  • BPGOperator authenticates and submits jobs to the BPG submit endpoint POST:/apiv2/spark. It handles all HTTP communication details, manages authentication tokens, and provides secure transmission of job configurations.
  • BPG routes submitted jobs to EMR on EKS clusters based on predefined routing rules. These routing rules are managed centrally through BPG configuration, allowing rules-based distribution of workloads across multiple clusters.
  • BPGOperator monitors job status, captures logs, and handles execution retries. It polls the BPG job status endpoint GET:/apiv2/spark/{subID}/status and streams logs to Airflow by polling the GET:/apiv2/log endpoint every second. The BPG log endpoint retrieves the most current log information directly from the Spark Driver Pod.
  • The DAG execution progresses to subsequent tasks based on job completion status and defined dependencies. BPGOperator communicates the job status through Airflow’s built-in task communication system, enabling complex workflow orchestration.

Refer to the BPG REST API interface documentation for additional details.

This architecture provides several key benefits:

  • Separation of responsibilities – Data Engineering and Platform Engineering teams in enterprise organizations typically maintain distinct responsibilities. The modular design in this solution enables platform engineers to configure BPGOperator and manage EMR on EKS clusters, while data engineers maintain DAGs.
  • Centralized code managementBPGOperator encapsulates all core functionalities required for Amazon MWAA DAGs to submit Spark jobs through BPG into a single, reusable Python module. This centralization minimizes code duplication across DAGs and improves maintainability by providing a standardized interface for job submissions.

Airflow custom operator for BPG

An Airflow Operator is a template for a predefined Task that you can define declaratively inside your DAGs. Airflow provides multiple built-in operators such as BashOperator, which executes bash commands, PythonOperator, which executes Python functions, and EmrContainerOperator, which submits new jobs to an EMR on EKS cluster. However, no built-in operators exist to implement all the steps required for the Amazon MWAA integration with BPG.

Airflow allows you to create new operators to suit your specific requirements. This operator type is known as a custom operator. A custom operator encapsulates the custom infrastructure-related logic in a single, maintainable component. Custom operators are created by extending the airflow.models.baseoperator.BaseOperator class. We have developed and open sourced an Airflow custom operator for BPG called BPGOperator, which implements the necessary steps to provide a seamless integration of Amazon MWAA with BPG.

The following class diagram provides a detailed view of the BPGOperator implementation.

Image showing class diagram for BPGOperator implementation

When a DAG includes a BPGOperator task, the Amazon MWAA instance triggers the operator to send a job request to BPG. The operator typically performs the following steps:

  • Initialize job BPGOperator prepares the job payload, including input parameters, configurations, connection details, and other metadata required by BPG.
  • Submit job BPGOperator handles HTTP POST requests to submit jobs to BPG endpoints with the provided configurations.
  • Monitor job execution BPGOperator checks the job status, polling BPG until the job completes successfully or fails. The monitoring process includes handling various job states, managing timeout scenarios, and responding to errors that occur during job execution.
  • Handle job completion – Upon completion, BPGOperator captures the job results, logs relevant details, and can trigger downstream tasks based on the execution outcome.

The following sequence diagram illustrates the interaction flow between the Airflow DAG, BPGOperator, and BPG.

Image showing sequence diagram for the interaction between the Airflow DAG, BPGOperator, and BPG.

Deploying the solution

In the remainder of this post, you will implement the end-to-end pipeline to run Spark jobs on multiple EMR on EKS clusters. You will begin by deploying the common components that serve as the foundation for building the pipelines. Next, you will deploy and configure BPG on an EKS cluster, followed by deploying and configuring BPGOperator on Amazon MWAA. Finally, you will execute Spark jobs on multiple EMR on EKS clusters from Amazon MWAA.

To streamline the setup process, we’ve automated the deployment of all infrastructure components required for this post, so you can focus on the essential aspects of job submission to build an end-to-end pipeline. We provide detailed information to help you understand each step, simplifying the setup while preserving the learning experience.

To showcase the solution, you will create three clusters and an Amazon MWAA environment:

  • Two EMR on EKS clusters: analytics-cluster and datascience-cluster
  • An EKS cluster: gateway-cluster
  • An Amazon MWAA environment: airflow-environment

analytics-cluster and datascience-cluster serve as data processing clusters that run Spark workloads, gateway-cluster hosts BPG, and airflow-environment hosts Airflow for job orchestration and scheduling.

You can find the code base in the GitHub repo.

Prerequisites

Before you deploy this solution, make sure that the following prerequisites are in place:

Set up common infrastructure

This step handles the setup of networking infrastructure, including virtual private cloud (VPC) and subnets, along with the configuration of AWS Identity and Access Management (IAM) roles, Amazon Simple Storage Service (Amazon S3) storage, Amazon Elastic Container Registry (Amazon ECR) repository for BPG images, Amazon Aurora PostgreSQL-Compatible Edition database, Amazon MWAA environment, and both EKS and EMR on EKS clusters with a preconfigured Spark operator. With this infrastructure automatically provisioned, you can concentrate on the subsequent steps without getting caught up in basic setup tasks.

  1. Clone the repository to your local machine and set the two environment variables. Replace <AWS_REGION> with the AWS Region where you want to deploy these resources.
    git clone https://github.com/aws-samples/sample-mwaa-bpg-emr-on-eks-spark-pipeline.git
    cd sample-mwaa-bpg-emr-on-eks-spark-pipeline
    			
    export REPO_DIR=$(pwd)
    export AWS_REGION=<AWS_REGION>

  2. Execute the following script to create the common infrastructure:
    cd ${REPO_DIR}/infra
    ./setup.sh

  3. To verify successful infrastructure deployment, navigate to the AWS CloudFormation console, open your stack, and check the Events, Resources, and Outputs tabs for completion status, details, and list of resources created.

You have completed the setup of the common components that serve as the foundation for rest of the implementation.

Set up Batch Processing Gateway

This section builds the Docker image for BPG, deploys the helm chart on the gateway-cluster EKS cluster, and exposes the BPG endpoint using Kubernetes service of type LoadBalancer. Complete the following steps:

  1. Deploy BPG on the gateway-cluster EKS cluster:
    cd ${REPO_DIR}/infra/bpg
    ./configure_bpg.sh

  2. Verify the deployment by listing the pods and viewing the pod logs:
    kubectl get pods --namespace bpg
    kubectl logs <BPG-PODNAME> --namespace bpg

    Review the logs and confirm there are no errors or exceptions.

  3. Exec into the BPG pod and verify the health check:
    kubectl exec -it <BPG-PODNAME> -n bpg -- bash
    curl -u admin:admin localhost:8080/skatev2/healthcheck/status

    The healthcheck API should return a successful response of {"status":"OK"}, confirming successful deployment of BPG on the gateway-cluster EKS cluster.

We have successfully configured BPG on gateway-cluster and set up EMR on EKS for both datascience-cluster and analytics-cluster. This is where we left off in the previous blog post. In the next steps, we will configure Amazon MWAA with BPGOperator, and then write and submit DAGs to demonstrate an end-to-end Spark-based data pipeline.

Configure the Airflow operator for BPG on Amazon MWAA

This section configures the BPGOperator plugin on the Amazon MWAA environment airflow-environment. Complete the following steps:

  1. Configure BPGOperator on Amazon MWAA:
    cd ${REPO_DIR}/bpg_operator
    ./configure_bpg_operator.sh

  2. On the Amazon MWAA console, navigate to the airflow-environment environment.
  3. Choose Open Airflow UI, and in the Airflow UI, choose the Admin dropdown menu and choose Plugins.
    You will see the BPGOperator plugin listed in the Airflow UI.
    Image showing BPGOperator plugin listed in the Airflow UI

Configure Airflow connections for BPG integration

This section guides you through setting up the Airflow connections that enable secure communication between your Amazon MWAA environment and BPG. BPGOperator uses the configured connection to authenticate and interact with BPG endpoints.

Execute the following script to configure the Airflow connection bpg_connection.

cd $REPO_DIR/airflow
./configure_connections.sh

In the Airflow UI, choose the Admin dropdown menu and choose Connections. You will see the bpg_connection listed in the Airflow UI.

Image showing Airflow Connections page with bpg_connection configured.

Configure the Airflow DAG to execute Spark jobs

This step configures an Airflow DAG to run a sample application. In this case, we will submit a DAG containing multiple sample Spark jobs using Amazon MWAA to EMR on EKS clusters using BPG. Please wait for few minutes for the DAG to appear in the Airflow UI.

cd $REPO_DIR/jobs
./configure_job.sh

Trigger the Amazon MWAA DAG

In this step, we trigger the Airflow DAG and observe the job execution behavior, including reviewing the Spark logs in the Airflow UI:

  1. In the Airflow UI, review the MWAASparkPipelineDemoJob DAG and choose the play icon trigger the DAG.
    Image showing sample Airflow Job, highlighting the play button to trigger the job
  2. Wait for DAG to complete successfully.
    Upon successful completion of the DAG, you should see Success:1 under the Runs column.
  3. In the Airflow UI, locate and choose the MWAASparkPipelineDemoJob DAG.
  4. On the Graph tab, choose any task (in this example, we select the calculate_pi task) and then choose the Logs
    Image showing the MWAASparkPipelineDemoJob's graph view
  5. View the Spark logs in the Airflow UI.
    Image showing the MWAASparkPipelineDemoJob calculate_pi task logs

Migrate existing Airflow DAGs to use BPG

In enterprise data platforms, a typical data pipeline consists of Amazon MWAA submitting Spark jobs to multiple EMR on EKS clusters using the SparkKubernetesOperator and an Airflow Connection of type Kubernetes. An Airflow Connection is a set of parameters and credentials used to establish communication between Amazon MWAA and external systems or services. A DAG refers to the connection name and connects to the external system.

The following diagram shows the typical architecture.
Image showing the existing job execution workflows not using BPG

In this setup, Airflow DAGs typically uses SparkKubernetesOperator and SparkKubernetesSensor to submit Spark jobs to a remote EMR on EKS cluster using kubernetes_conn_id=<connection_name>.

The following code snippet shows the relevant details:

# Submit Spark-Pi job using Kubernetes connection
submit_spark_pi = SparkKubernetesOperator(
	task_id='submit_spark_pi',
	namespace='default',
	application_file=spark_pi_yaml,
	kubernetes_conn_id='emr_on_eks_connection_[1|2]',  # Connection ID defined in Airflow
	dag=dag
)

To migrate the infrastructure to a BPG-based infrastructure without impacting the continuity of the environment, we can deploy a parallel infrastructure using BPG, create a new Airflow Connection for BPG, and incrementally migrate the DAGs to use the new connection. By doing so, we won’t disrupt the existing infrastructure until the BPG-based infrastructure is completely operational, including the migration of all existing DAGs.

The following diagram showcases the interim state where both the Kubernetes connection and BPG connection are operational. Blue arrows indicate the existing workflow paths, and red arrows represent the new BPG-based migration paths.

Image showing the existing workflow paths and the new bpg based migration path

The modified code snippet for the DAG is as follows:

# Submit Spark-Pi job using BPG connection
submit_spark_pi = BPGOperator(
	task_id='submit_spark_pi',
	application_file=spark_pi_yaml,
	application_file_type='yaml'
	connection_id='bpg_connection',  # Connection ID defined in Airflow
	dag=dag
)

Finally, when all the DAGs have been modified to use BPGOperator instead of SparkKubernetesOperator, you can decommission any remnants of the old workflow. The final state of the infrastructure will look like the following diagram.

Image showing the final state of the infrastructure after all the job migrations are complete.

Using this approach, we can seamlessly introduce BPG into an environment that currently uses only Amazon MWAA and EMR on EKS clusters.

Clean up

To avoid incurring future charges from the resources created in this tutorial, clean up your environment after you’ve completed the steps. You can do this by running the cleanup.sh script, which will safely remove all the resources provisioned during the setup:

cd ${REPO_DIR}/setup
./cleanup.sh

Conclusion

In the post Use Batch Processing Gateway to automate job management in multi-cluster Amazon EMR on EKS environments, we introduced Batch Processing Gateway as a solution for routing Spark workloads across multiple EMR on EKS clusters. In this post, we demonstrated how to enhance this foundation by integrating BPG with Amazon MWAA. Through our custom BPGOperator, we’ve shown how to build robust end-to-end Spark-based data processing pipelines while maintaining clear separation of responsibilities and centralized code management. Finally, we demonstrated how to seamlessly incorporate the solution into your existing Amazon MWAA and EMR on EKS data platform without impacting operational continuity.

We encourage you to experiment with this architecture in your own environment, adapting it to fit your unique workloads and operational requirements. By implementing this solution, you can build efficient and scalable data processing pipelines that use the full potential of EMR on EKS and Amazon MWAA. Explore further by deploying the solution in your AWS account while adhering to your organizational security best practices and share your experiences with the AWS Big Data community.


About the Authors

Suvojit DasguptaSuvojit Dasgupta is a Principal Data Architect at AWS. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.

Avinash DesireddyAvinash Desireddy is a Cloud Infrastructure Architect at AWS, passionate about building secure applications and data platforms. He has extensive experience in Kubernetes, DevOps, and enterprise architecture, helping customers containerize applications, streamline deployments, and optimize cloud-native environments.

Design patterns for implementing Hive Metastore for Amazon EMR on EKS

Post Syndicated from Avinash Desireddy original https://aws.amazon.com/blogs/big-data/design-patterns-for-implementing-hive-metastore-for-amazon-emr-on-eks/

In modern data architectures, the need to manage and query vast datasets efficiently, consistently, and accurately is paramount. For organizations that deal with big data processing, managing metadata becomes a critical concern. This is where Hive Metastore (HMS) can serve as a central metadata store, playing a crucial role in these modern data architectures.

HMS is a central repository of metadata for Apache Hive tables and other data lake table formats (for example, Apache Iceberg), providing clients (such as Apache Hive, Apache Spark, and Trino) access to this information using the Metastore Service API. Over time, HMS has become a foundational component for data lakes, integrating with a diverse ecosystem of open source and proprietary tools.

In non-containerized environments, there was typically only one approach to implementing HMS—running it as a service in an Apache Hadoop cluster. With the advent of containerization in data lakes through technologies such as Docker and Kubernetes, multiple options for implementing HMS have emerged. These options offer greater flexibility, allowing organizations to tailor HMS deployment to their specific needs and infrastructure.

In this post, we will explore the architecture patterns and demonstrate their implementation using Amazon EMR on EKS with Spark Operator job submission type, guiding you through the complexities to help you choose the best approach for your use case.

Solution overview

Prior to Hive 3.0, HMS was tightly integrated with Hive and other Hadoop ecosystem components. Hive 3.0 introduced a Standalone Hive Metastore. This new version of HMS functions as an independent service, decoupled from other Hive and Hadoop components such as HiveServer2. This separation enables various applications, such as Apache Spark, to interact directly with HMS without requiring a full Hive and Hadoop environment installation. You can learn more about other components of Apache Hive on the Design page.

In this post, we will use a Standalone Hive Metastore to illustrate the architecture and implementation details of various design patterns. Any reference to HMS refers to a Standalone Hive Metastore.

The HMS broadly consists of two main components:

  • Backend database: The database is a persistent data store that holds all the metadata, such as table schemas, partitions, and data locations.
  • Metastore service API: The Metastore service API is a stateless service that manages the core functionality of the HMS. It handles read and write operations to the backend database.

Containerization and Kubernetes offers various architecture and implementation options for HMS, including, running:

In this post, we’ll use Apache Spark as the data processing framework to demonstrate these three architectural patterns. However, these patterns aren’t limited to Spark and can be applied to any data processing framework, such as Hive or Trino, that relies on HMS for managing metadata and accessing catalog information.

Note that in a Spark application, the driver is responsible for querying the metastore to fetch table schemas and locations, then distributes this information to the executors. Executors process the data using the locations provided by the driver, never needing to query the metastore directly. Hence, in the three patterns described in the following sections, only the driver communicates with the HMS, not the executors.

HMS as sidecar container

In this pattern, HMS runs as a sidecar container within the same pod as the data processing framework, such as Apache Spark. This approach uses Kubernetes multi-container pod functionality, allowing both HMS and the data processing framework to operate together in the same pod. The following figure illustrates this architecture, where the HMS container is part of Spark driver pod.

HMS as sidecar container

This pattern is suited for small-scale deployments where simplicity is the priority. Because HMS is co-located with the Spark driver, it reduces network overhead and provides a straightforward setup. However, it’s important to note that in this approach HMS operates exclusively within the scope of the parent application and isn’t accessible by other applications. Additionally, row conflicts might arise when multiple jobs attempt to insert data into the same table simultaneously. To address this, you should make sure that no two jobs are writing to the same table simultaneously.

Consider this approach if you prefer a basic architecture. It’s ideal for organizations where a single team manages both the data processing framework (for example, Apache Spark) and HMS, and there’s no need for other applications to use HMS.

Cluster dedicated HMS

In this pattern, HMS runs in multiple pods managed through a Kubernetes deployment, typically within a dedicated namespace in the same data processing EKS cluster. The following figure illustrates this setup, with HMS decoupled from Spark driver pods and other workloads.

Cluster dedicated HMS

This pattern works well for medium-scale deployments where moderate isolation is enough, and compute and data needs can be handled within a few clusters. It provides a balance between resource efficiency and isolation, making it ideal for use cases where scaling metadata services independently is important, but complete decoupling isn’t necessary. Additionally, this pattern works well when a single team manages both the data processing frameworks and HMS, ensuring streamlined operations and alignment with organizational responsibilities.

By decoupling HMS from Spark driver pods, it can serve multiple clients, such as Apache Spark and Trino, while sharing cluster resources. However, this approach might lead to resource contention during periods of high demand, which can be mitigated by enforcing tenant isolation on HMS pods.

External HMS

In this architecture pattern, HMS is deployed in its own EKS cluster deployed using Kubernetes deployment and exposed as a Kubernetes Service using AWS Load Balancer Controller, separate from the data processing clusters. The following figure illustrates this setup, where HMS is configured as an external service, separate from the data processing clusters.

External HMS

This pattern suits scenarios where you want a centralized metastore service shared across multiple data processing clusters. HMS allows different data teams to manage their own data processing clusters while relying on the shared metastore for metadata management. By deploying HMS in a dedicated EKS cluster, this pattern provides maximum isolation, independent scaling, and the flexibility to operate and managed as its own independent service.

While this approach offers clear separation of concerns and the ability to scale independently, it also introduces higher operational complexity and potentially increased costs because of the need to manage an additional cluster. Consider this pattern if you have strict compliance requirements, need to ensure complete isolation for metadata services, or want to provide a unified metadata catalog service for multiple data teams. It works well in organizations where different teams manage their own data processing frameworks and rely on a shared metadata store for data processing needs. Additionally, the separation enables specialized teams to focus on their respective areas.

Deploy the solution

In the remainder of this post, you will explore the implementation details for each of the three architecture patterns, using EMR on EKS with Spark Operator job submission type as an example to demonstrate their implementation. Note that this implementation hasn’t been tested with other EMR on EKS Spark job submission types. You will begin by deploying the common components that serve as the foundation for all the architecture patterns. Next, you’ll deploy the components specific to each pattern. Finally, you’ll execute Spark jobs to connect to the HMS implementation unique to each pattern and verify the successful execution and retrieval of data and metadata.

To streamline the setup process, we’ve automated the deployment of common infrastructure components so you can focus on the essential aspects of each HMS architecture. We’ll provide detailed information to help you understand each step, simplifying the setup while preserving the learning experience.

Scenario

To showcase the patterns, you will create three clusters:

  • Two EMR on EKS clusters: analytics-cluster and datascience-cluster
  • An EKS cluster: hivemetastore-cluster

Both analytics-cluster and datascience-cluster serve as data processing clusters that run Spark workloads, while the hivemetastore-cluster hosts the HMS.

You will use analytics-cluster to illustrate the HMS as sidecar and cluster dedicated pattern. You will use all three clusters to demonstrate the external HMS pattern.

Source code

You can find the codebase in the AWS Samples GitHub repository.

Prerequisites

Before you deploy this solution, make sure that the following prerequisites are in place:

Set up common infrastructure

Begin by setting up the infrastructure components that are common to all three architectures.

  1. Clone the repository to your local machine and set the two environment variables. Replace <AWS_REGION> with the AWS Region where you want to deploy these resources.
git clone https://github.com/aws-samples/sample-emr-eks-hive-metastore-patterns.git
cd sample-emr-eks-hive-metastore-patterns
export REPO_DIR=$(pwd)
export AWS_REGION=<AWS_REGION>
  1. Execute the following script to create the shared infrastructure.
cd ${REPO_DIR}/setup
./setup.sh
  1. To verify successful infrastructure deployment, navigate to the AWS Management Console for AWS CloudFormation, select your stack, and check the Events, Resources, and Outputs tabs for completion status, details, and list of resources created.

You have completed the setup of the common components that serve as the foundation for all architectures. You will now deploy the components specific to each architecture and execute Apache Spark jobs to validate the successful implementation.

HMS in a sidecar container

To implement HMS using the sidecar container pattern, the Spark application requires setting both sidecar and catalog properties in the job configuration file.

  1. Execute the following script to configure the analytics-cluster for sidecar pattern. For this post, we stored the HMS database credentials into a Kubernetes Secret object. We recommend using Kubernetes External Secrets Operator to fetch HMS database credentials from AWS Secrets Manager.
cd ${REPO_DIR}/hms-sidecar
./configure-hms-sidecar.sh analytics-cluster
  1. Review the Spark job manifest file spark-hms-sidecar-job.yaml. This file was created by substituting variables in the spark-hms-sidecar-job.tpl template in the previous step. The following samples highlight key sections of the manifest file.
spec:
  driver:
  ...
    sidecars:
      # Hive Metastore Sidecar container
      - name: hive-metastore
        image: ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/hive-metastore:latest
        env:
          # These settings configure metastore to use Amazon Postgres RDS as backend database, connecting to it via jdbc URL.
          - name: HIVE_METASTORE_DB_TYPE
            value: postgres
          - name: HIVE_METASTORE_DB_DRIVER
            value: org.postgresql.Driver
          - name: HIVE_METASTORE_DB_URL
            value: jdbc:postgresql://${HMS_RDS_PROXY_ENDPOINT}:5432/hivemetastore
          # The warehouse location is specified as an S3 bucket
          - name: HIVE_METASTORE_WAREHOUSE_LOC
            value: s3a://${S3_BUCKET_NAME}/warehouse
          - name: AWS_REGION
            value: ${AWS_REGION}
          # The database username and password are passed via environment variables. The password is retrieved from a Kubernetes secret
          - name: HIVE_METASTORE_DB_USER
            value: hive_metastore_user
          - name: HIVE_METASTORE_DB_PASSWORD
            valueFrom:
              secretKeyRef:
                name: hms-rds-password
                key: HIVE_METASTORE_DB_PASSWORD

Spark job configuration

spec:  
  sparkConf:
    # Hive Catalog properties
    # Sets spark to use Hive as the catalog for SQL operations
    spark.sql.catalogImplementation: "hive"
    # HMS is exposed on localhost:8080 since the sidecar runs in the same pod. Spark connects to the sidecar via this URI
    spark.hadoop.hive.metastore.uris: "thrift://localhost:9083"
    # The data location is set to S3 bucket
    spark.sql.warehouse.dir: "s3a://${S3_BUCKET_NAME}/warehouse"
    # Retrieve temporary AWS credentials by assuming a role using a web identity token
    spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
    # Configure Spark to use S3A filesystem
    spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"

Submit the Spark job and verify the HMS as sidecar container setup

In this pattern, you will submit Spark jobs in analytics-cluster. The Spark jobs will connect to the HMS service running as a sidecar container in the driver pod.

  1. Run the Spark job to verify that the setup was successful.
kubectl apply -f spark-hms-sidecar-job.yaml
  1. Describe the sparkapplication object.
kubectl get sparkapplication -n emr
kubectl describe sparkapplication spark-hms-sidecar-job --namespace emr
  1. List the pods and observe the number of containers attached to the driver pod. Wait until the Status changes from ContainerCreating to Running (should take just a few seconds).
kubectl get pods -n emr
  1. View the driver logs to validate the output.
kubectl logs spark-hms-sidecar-job-driver --namespace emr
  1. If you encounter the following error, wait for a few minutes and rerun the previous command.
Error from server (BadRequest): container "spark-kubernetes-driver" in pod "spark-hms-sidecar-driver" is waiting to start: ContainerCreating
  1. After successful completion of the job, you see the following message in the logs. The tabular output successfully validates the setup of HMS as a sidecar container.
...
24/09/17 21:44:00 INFO metastore: Trying to connect to metastore with URI thrift://localhost:9083
24/09/17 21:44:00 INFO metastore: Opened a connection to metastore, current connections: 1
24/09/17 21:44:00 INFO metastore: Connected to metastore.
...
...
+-------+---+-------+---+
|pattern|id |name   |age|
+-------+---+-------+---+
|Sidecar|1 |Alice   |30 |
|Sidecar|2 |Bob     |25 |
|Sidecar|3 |Charlie |35 |
+-------+---+-------+---+

Cluster dedicated HMS

To implement HMS using a cluster dedicated HMS pattern, the Spark application requires setting up HMS URI and catalog properties in the job configuration file.

  1. Execute the following script to configure the analytics-cluster for cluster dedicated pattern.
cd ${REPO_DIR}/hms-cluster-dedicated
./configure-hms-cluster-dedicated.sh analytics-cluster
  1. Verify the HMS deployment by listing the pods and viewing the logs. No Java exceptions in the logs confirms that the Hive Metastore service is running successfully.
kubectl get pods --namespace hive-metastore
kubectl logs <HMS-PODNAME> --namespace hive-metastore
  1. Review the Spark job manifest file, spark-hms-cluster-dedicated-job.yaml. This file is created by substituting variables in the spark-hms-cluster-dedicated-job.tpl template in the previous step. The following sample highlights key sections of the manifest file.
spec: 
  sparkConf:
    # Hive Catalog properties
    # Sets spark to use Hive as the catalog for SQL operations
    spark.sql.catalogImplementation: "hive"
    # HMS is running in a pod and we can connect to it in the same EKS cluster via this URI
    spark.hadoop.hive.metastore.uris: "thrift://hive-metastore-svc.hive-metastore.svc.cluster.local:9083"
    # The data location is set to S3 bucket
    spark.sql.warehouse.dir: "s3a://${S3_BUCKET_NAME}/warehouse"
    # Retrieve temporary AWS credentials by assuming a role using a web identity token
    spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
    # Configure Spark to use S3A filesystem
    spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"

Submit the Spark job and verify the cluster dedicated HMS setup

In this pattern, you will submit Spark jobs in analytics-cluster. The Spark jobs will connect to the HMS service in the same data processing EKS cluster.

  1. Submit the job.
kubectl apply -f spark-hms-cluster-dedicated-job.yaml -n emr
  1. Verify the status.
kubectl get sparkapplication -n emr
kubectl describe sparkapplication spark-hms-cluster-dedicated-job --namespace emr
  1. Describe driver pod and observe the number of containers attached to the driver pod. Wait until the status changes from ContainerCreating to Running (should take just a few seconds).
kubectl get pods -n emr
  1. View the driver logs to validate the output.
kubectl logs spark-hms-cluster-dedicated-job-driver --namespace emr
  1. After successful completion of the job, you should see the following message in the logs. The tabular output successfully validates the setup of cluster dedicated HMS.
...
24/09/17 21:44:00 INFO metastore: Trying to connect to metastore with URI thrift://hive-metastore-svc.hive-metastore.svc.cluster.local:9083
24/09/17 21:44:00 INFO metastore: Opened a connection to metastore, current connections: 1
24/09/17 21:44:00 INFO metastore: Connected to metastore.
...
...
+-----------------+---+-------+---+
|pattern          |id |name   |age|
+-----------------+---+-------+---+
|Cluster Dedicated|1  |Alice  |30 |
|Cluster Dedicated|2  |Bob    |25 |
|Cluster Dedicated|3  |Charlie|35 |
+-----------------+---+-------+---+

External HMS

To implement an external HMS pattern, the Spark application requires setting up an HMS URI for the service endpoint exposed by hivemetastore-cluster.

  1. Execute the following script to configure hivemetastore-cluster for External HMS pattern.
cd ${REPO_DIR}/hms-external
./configure-hms-external.sh
  1. Review the Spark job manifest file spark-hms-external-job.yaml. This file is created by substituting variables in the spark-hms-external-job.tpl template during the setup process. The following sample highlights key sections of the manifest file.
spec:
  sparkConf:
    # Hive Catalog properties
    # Sets spark to use Hive as the catalog for SQL operations
    spark.sql.catalogImplementation: "hive"
    # HMS is running in a cluster and we can connect to it in the EKS cluster via this URI
    spark.hadoop.hive.metastore.uris: "thrift://${HMS_URI_ENDPOINT}:9083"
    # The data location is set to S3 bucket
    spark.sql.warehouse.dir: "s3a://${S3_BUCKET_NAME}/warehouse"
    # Retrieve temporary AWS credentials by assuming a role using a web identity token
    spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
    # Configure Spark to use S3A filesystem
    spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"

Submit the Spark job and verify the HMS in a separate EKS cluster setup

To verify the setup, submit Spark jobs in analytics-cluster and datascience-cluster. The Spark jobs will connect to the HMS service in the hivemetastore-cluster.

Use the following steps for analytics-cluster and then for datascience-cluster to verify that both clusters can connect to the HMS on hivemetastore-cluster.

  1. Run the spark job to test the successful setup. Replace <CONTEXT_NAME> with Kubernetes context for analytics-cluster and then for datascience-cluster.
kubectl config use-context <CONTEXT_NAME>
kubectl apply -f spark-hms-external-job.yaml -n emr
  1. Describe the sparkapplication object.
kubectl get sparkapplication spark-hms-external-job -n emr
kubectl describe sparkapplication spark-hms-external-job --namespace emr
  1. List the pods and observe the number of containers attached to the driver pod. Wait until the status changes from ContainerCreating to Running (should take just a few seconds).
kubectl get pods -n emr
  1. View the driver logs to validate the output on the data processing cluster.
kubectl logs spark-hms-external-job-driver --namespace emr
  1. The output should look like the following. The tabular output successfully validates the setup of HMS in a separate EKS cluster.
After successful completion of the job, you should be able to see the below message in the logs.
...
24/09/17 21:44:00 INFO metastore: Trying to connect to metastore with URI  thrift://k8s-hivemeta-hmsexter-xxxxxx.elb.us-east-1.amazonaws.com:9083
24/09/17 21:44:00 INFO metastore: Opened a connection to metastore, current connections: 1
24/09/17 21:44:00 INFO metastore: Connected to metastore.
...
...
+--------+---+-------+---+
|pattern |id |name   |age|
+--------+---+-------+---+
|External|1  |Alice  |30 |
|External|2  |Bob    |25 |
|External|3  |Charlie|35 |
+--------+---+-------+---+

Clean up

To avoid incurring future charges from the resources created in this tutorial, clean up your environment after you’ve completed the steps. You can do this by running the cleanup.sh script, which will safely remove all the resources provisioned during the setup.

cd ${REPO_DIR}/setup
./cleanup.sh

Conclusion

In this post, we’ve explored the design patterns for implementing the Hive Metastore (HMS) with EMR on EKS with Spark Operator, each offering distinct advantages depending on your requirements. Whether you choose to deploy HMS as a sidecar container within the Apache Spark Driver pod, or as a Kubernetes deployment in the data processing EKS cluster, or as an external HMS service in a separate EKS cluster, the key considerations revolve around communication efficiency, scalability, resource isolation, high availability, and security.

We encourage you to experiment with these patterns in your own setups, adapting them to fit your unique workloads and operational needs. By understanding and applying these design patterns, you can optimize your Hive Metastore deployments for performance, scalability, and security in your EMR on EKS environments. Explore further by deploying the solution in your AWS account and share your experiences and insights with the community.


About the Authors

Avinash Desireddy is a Cloud Infrastructure Architect at AWS, passionate about building secure applications and data platforms. He has extensive experience in Kubernetes, DevOps, and enterprise architecture, helping customers containerize applications, streamline deployments, and optimize cloud-native environments.

Suvojit Dasgupta is a Principal Data Architect at AWS. He leads a team of skilled engineers in designing and building scalable data solutions for AWS customers. He specializes in developing and implementing innovative data architectures to address complex business challenges.