Tag Archives: AWS Big Data

Enrich your AWS Glue Data Catalog with generative AI metadata using Amazon Bedrock

Post Syndicated from Manos Samatas original https://aws.amazon.com/blogs/big-data/enrich-your-aws-glue-data-catalog-with-generative-ai-metadata-using-amazon-bedrock/

Metadata can play a very important role in using data assets to make data driven decisions. Generating metadata for your data assets is often a time-consuming and manual task. By harnessing the capabilities of generative AI, you can automate the generation of comprehensive metadata descriptions for your data assets based on their documentation, enhancing discoverability, understanding, and the overall data governance within your AWS Cloud environment. This post shows you how to enrich your AWS Glue Data Catalog with dynamic metadata using foundation models (FMs) on Amazon Bedrock and your data documentation.

AWS Glue is a serverless data integration service that makes it straightforward for analytics users to discover, prepare, move, and integrate data from multiple sources. Amazon Bedrock is a fully managed service that offers a choice of high-performing FMs from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API.

Solution overview

In this solution, we automatically generate metadata for table definitions in the Data Catalog by using large language models (LLMs) through Amazon Bedrock. First, we explore the option of in-context learning, where the LLM generates the requested metadata without documentation. Then we improve the metadata generation by adding the data documentation to the LLM prompt using Retrieval Augmented Generation (RAG).

AWS Glue Data Catalog

This post uses the Data Catalog, a centralized metadata repository for your data assets across various data sources. The Data Catalog provides a unified interface to store and query information about data formats, schemas, and sources. It acts as an index to the location, schema, and runtime metrics of your data sources.

The most common method to populate the Data Catalog is to use an AWS Glue crawler, which automatically discovers and catalogs data sources. When you run the crawler, it creates metadata tables that are added to a database you specify or the default database. Each table represents a single data store.

Generative AI models

LLMs are trained on vast volumes of data and use billions of parameters to generate outputs for common tasks like answering questions, translating languages, and completing sentences. To use an LLM for a specific task like metadata generation, you need an approach to guide the model to produce the outputs you expect.

This post shows you how to generate descriptive metadata for your data with two different approaches:

  • In-context learning
  • Retrieval Augmented Generation (RAG)

The solutions uses two generative AI models available in Amazon Bedrock: for text generation and Amazon Titan Embeddings V2 for text retrieval tasks.

The following sections describe the implementation details of each approach using the Python programming language. You can find the accompanying code in the GitHub repository. You can implement it step by step in Amazon SageMaker Studio and JupyterLab or your own environment. If you’re new to SageMaker Studio, check out the Quick setup experience, which allows you to launch it with default settings in minutes. You can also use the code in an AWS Lambda function or your own application.

Approach 1: In-context learning

In this approach, you use an LLM to generate the metadata descriptions. You employ prompt engineering techniques to guide the LLM on the outputs you want it to generate. This approach is ideal for AWS Glue databases with a small number of tables. You can send the table information from the Data Catalog as context in your prompt without exceeding the context window (the number of input tokens that most Amazon Bedrock models accept). The following diagram illustrates this architecture.

Approach 2: RAG architecture

If you have hundreds of tables, adding all of the Data Catalog information as context to the prompt may lead to a prompt that exceeds the LLM’s context window. In some cases, you may also have additional content such as business requirements documents or technical documentation you want the FM to reference before generating the output. Such documents can be several pages that typically exceed the maximum number of input tokens most LLMs will accept. As a result, they can’t be included in the prompt as they are.

The solution is to use a RAG approach. With RAG, you can optimize the output of an LLM so it references an authoritative knowledge base outside of its training data sources before generating a response. RAG extends the already powerful capabilities of LLMs to specific domains or an organization’s internal knowledge base, without the need to fine-tune the model. It is a cost-effective approach to improving LLM output, so it remains relevant, accurate, and useful in various contexts.

With RAG, the LLM can reference technical documents and other information about your data before generating the metadata. As a result, the generated descriptions are expected to be richer and more accurate.

The example in this post ingests data from a public Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. The dataset contains data in JSON format about US legislators and the seats that they have held in the U.S. House of Representatives and U.S. Senate. The data documentation was retrieved from and the Popolo specification http://www.popoloproject.com/.

The following architecture diagram illustrates the RAG approach.

 

The steps are as follows:

  1. Ingest the information from the data documentation. The documentation can be in a variety of formats. For this post, the documentation is a website.
  2. Chunk the contents of the HTML page of the data documentation. Generate and store vector embeddings for the data documentation.
  3. Fetch information for the database tables from the Data Catalog.
  4. Perform a similarity search in the vector store and retrieve the most relevant information from the vector store.
  5. Build the prompt. Provide instructions on how to create metadata and add the retrieved information and the Data Catalog table information as context. Because this is a rather small database, containing six tables, all of the information about the database is included.
  6. Send the prompt to the LLM, get the response, and update the Data Catalog.

Prerequisites

To follow the steps in this post and deploy the solution in your own AWS account, refer to the GitHub repository.

You need the following prerequisite resources:

 {
   "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:PutObject"
          ],
          "Resource": [
              "arn:aws:s3:::aws-gen-ai-glue-metadata-*/*"
          ]
        }
    ]
}
  • An IAM role for your notebook environment. The IAM role should have the appropriate permissions for AWS Glue, Amazon Bedrock, and Amazon S3. The following is an example policy. You can apply additional conditions to restrict it further for your own environment.
{
      "Version": "2012-10-17",
      "Statement": [
           {
                 "Sid": "GluePermissions",
                 "Effect": "Allow",
                 "Action": [
                      "glue:GetCrawler",
                      "glue:DeleteDatabase",
                      "glue:GetTables",
                      "glue:DeleteCrawler",
                      "glue:StartCrawler",
                      "glue:CreateDatabase",
                      "glue:UpdateTable",
                      "glue:DeleteTable",
                      "glue:UpdateCrawler",
                      "glue:GetTable",
                      "glue:CreateCrawler"
                 ],
                 "Resource": "*"
           },
           {
                 "Sid": "S3Permissions",
                 "Effect": "Allow",
                 "Action": [
                      "s3:PutObject",
                      "s3:GetObject",
                      "s3:CreateBucket",
                      "s3:ListBucket",
                      "s3:DeleteObject",
                      "s3:DeleteBucket"
                 ],
                 "Resource": "arn:aws:s3:::<bucket_name>"
           },
           {
                 "Sid": "IAMPermissions",
                 "Effect": "Allow",
                 "Action": "iam:PassRole",
                 "Resource": "arn:aws:iam::<account_ID>:role/GlueCrawlerRoleBlog"

           },
           {
                 "Sid": "BedrockPermissions",
                 "Effect": "Allow",
                 "Action": "bedrock:InvokeModel",
                 "Resource": [
                      "arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
                      "arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v2:0"
                 ]
           }
      ]
}
  • Model access for Anthropic’s Claude 3 and Amazon Titan Text Embeddings V2 on Amazon Bedrock.
  • The notebook glue-catalog-genai_claude.ipynb.

Set up the resources and environment

Now that you have completed the prerequisites, you can switch to the notebook environment to run the next steps. First, the notebook will create the required resources:

  • S3 bucket
  • AWS Glue database
  • AWS Glue crawler, which will run and automatically generate the database tables

After you finish the setup steps, you will have an AWS Glue database called legislators.

The crawler creates the following metadata tables:

  • persons
  • memberships
  • organizations
  • events
  • areas
  • countries

This is a semi-normalized collection of tables containing legislators and their histories.

Follow the rest of the steps in the notebook to complete the environment setup. It should only take a few minutes.

Inspect the Data Catalog

Now that you have completed the setup, you can inspect the Data Catalog to familiarize yourself with it and the metadata it captured. On the AWS Glue console, choose Databases in the navigation pane, then open the newly created legislators database. It should contain six tables, as shown in the following screenshot:

You can open any table to inspect the details. The table description and comment for each column is empty because they aren’t completed automatically by the AWS Glue crawlers.

You can use the AWS Glue API to programmatically access the technical metadata for each table. The following code snippet uses the AWS Glue API through the AWS SDK for Python (Boto3) to retrieve tables for a chosen database and then prints them on the screen for validation. The following code, found in the notebook of this post, is used to get the data catalog information programmatically.

def get_alltables(database):
    tables = []
    get_tables_paginator = glue_client.get_paginator('get_tables')
    for page in get_tables_paginator.paginate(DatabaseName=database):
        tables.extend(page['TableList'])
    return tables

def json_serial(obj):
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    raise TypeError ("Type %s not serializable" % type(obj))

database_tables =  get_alltables(database)

for table in database_tables:
    print(f"Table: {table['Name']}")
    print(f"Columns: {[col['Name'] for col in table['StorageDescriptor']['Columns']]}")

Now that you’re familiar with the AWS Glue database and tables, you can move to the next step to generate table metadata descriptions with generative AI.

Generate table metadata descriptions with Anthropic’s Claude 3 using Amazon Bedrock and LangChain

In this step, we generate technical metadata for a selected table that belongs to an AWS Glue database. This post uses the persons table. First, we get all the tables from the Data Catalog and include it as part of the prompt. Even though our code aims to generate metadata for a single table, giving the LLM wider information is useful because you want the LLM to detect foreign keys. In our notebook environment we install LangChain v0.2.1. See the following code:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from botocore.config import Config
from langchain_aws import ChatBedrock

glue_data_catalog = json.dumps(get_alltables(database),default=json_serial)


model_kwargs ={
    "temperature": 0.5, # You can increase or decrease this value depending on the amount of randomness you want injected into the response. A value closer to 1 increases the amount of randomness.
    "top_p": 0.999
}

model = ChatBedrock(
    client = bedrock_client,
    model_id=model_id,
    model_kwargs=model_kwargs
)

table = "persons"
response_get_table = glue_client.get_table( DatabaseName = database, Name = table )
pprint.pp(response_get_table)

user_msg_template_table="""
I'd like you to create metadata descriptions for the table called {table} in your AWS Glue data catalog. Please follow these steps:
1. Review the data catalog carefully
2. Use all the data catalog information to generate the table description
3. If a column is a primary key or foreign key to another table mention it in the description.
4. In your response, reply with the entire JSON object for the table {table}
5. Remove the DatabaseName, CreatedBy, IsRegisteredWithLakeFormation, CatalogId,VersionId,IsMultiDialectView,CreateTime, UpdateTime.
6. Write the table description in the Description attribute
7. List all the table columns under the attribute "StorageDescriptor" and then the attribute Columns. Add Location, InputFormat, and SerdeInfo
8. For each column in the StorageDescriptor, add the attribute "Comment". If a table uses a composite primary key, then the order of a given column in a table’s primary key is listed in parentheses following the column name.
9. Your response must be a valid JSON object.
10. Ensure that the data is accurately represented and properly formatted within the JSON structure. The resulting JSON table should provide a clear, structured overview of the information presented in the original text.
11. If you cannot think of an accurate description of a column, say 'not available'
Here is the data catalog json in <glue_data_catalog></glue_data_catalog> tags.
<glue_data_catalog>
{data_catalog}
</glue_data_catalog>
Here is some additional information about the database in <notes></notes> tags.
<notes>
Typically foreign key columns consist of the name of the table plus the id suffix
<notes>
"""
messages = [
    ("system", "You are a helpful assistant"),
    ("user", user_msg_template_table),
]

prompt = ChatPromptTemplate.from_messages(messages)

chain = prompt | model | StrOutputParser()

# Chain Invoke

TableInputFromLLM = chain.invoke({"data_catalog": {glue_data_catalog}, "table":table})
print(TableInputFromLLM)

In the preceding code, you instructed the LLM to provide a JSON response that fits the TableInput object expected by the Data Catalog update API action. The following is an example response:

{
  "Name": "persons",
  "Description": "This table contains information about individual persons, including their names, identifiers, contact details, and other relevant personal data.",
  "StorageDescriptor": {
    "Columns": [
      {
        "Name": "family_name",
        "Type": "string",
        "Comment": "The family name or surname of the person."
      },
      {
        "Name": "name",
        "Type": "string",
        "Comment": "The full name of the person."
      },
      {
        "Name": "links",
        "Type": "array<struct<note:string,url:string>>",
        "Comment": "An array of links related to the person, containing a note and URL."
      },
      {
        "Name": "gender",
        "Type": "string",
        "Comment": "The gender of the person."
      },
      {
        "Name": "image",
        "Type": "string",
        "Comment": "A URL or path to an image of the person."
      },
      {
        "Name": "identifiers",
        "Type": "array<struct<scheme:string,identifier:string>>",
        "Comment": "An array of identifiers for the person, each with a scheme and identifier value."
      },
      {
        "Name": "other_names",
        "Type": "array<struct<lang:string,note:string,name:string>>",
        "Comment": "An array of other names the person may be known by, including the language, a note, and the name itself."
      },

      {
        "Name": "sort_name",
        "Type": "string",
        "Comment": "The name to be used for sorting or alphabetical ordering."
      },
      {
        "Name": "images",
        "Type": "array<struct<url:string>>",
        "Comment": "An array of URLs or paths to additional images of the person."
      },
      {
        "Name": "given_name",
        "Type": "string",
        "Comment": "The given name or first name of the person."
      },
      {
        "Name": "birth_date",
        "Type": "string",
        "Comment": "The date of birth of the person."
      },
      {
        "Name": "id",
        "Type": "string",
        "Comment": "The unique identifier for the person (likely a primary key)."
      },
      {
        "Name": "contact_details",
        "Type": "array<struct<type:string,value:string>>",
        "Comment": "An array of contact details for the person, including the type (e.g., email, phone) and the value."
      },
      {
        "Name": "death_date",
        "Type": "string",
        "Comment": "The date of death of the person, if applicable."
      }
    ],
    "Location": "s3://<your-s3-bucket>/persons/",
    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe",
      "Parameters": {
        "paths": "birth_date,contact_details,death_date,family_name,gender,given_name,id,identifiers,image,images,links,name,other_names,sort_name"
      }
    }
  },
  "PartitionKeys": [],
  "TableType": "EXTERNAL_TABLE"
}

You can also validate the JSON generated to make sure it conforms to the format expected by the AWS Glue API:

from jsonschema import validate

schema_table_input = {
    "type": "object",
    "properties" : {
            "Name" : {"type" : "string"},
            "Description" : {"type" : "string"},
            "StorageDescriptor" : {
            "Columns" : {"type" : "array"},
            "Location" : {"type" : "string"} ,
            "InputFormat": {"type" : "string"} ,
            "SerdeInfo": {"type" : "object"}
        }
    }
}
validate(instance=json.loads(TableInputFromLLM), schema=schema_table_input)

Now that you have generated table and column descriptions, you can update the Data Catalog.

Update the Data Catalog with metadata

In this step, use the AWS Glue API to update the Data Catalog:

response = glue_client.update_table(DatabaseName=database, TableInput= json.loads(TableInputFromLLM) )
print(f"Table {table} metadata updated!")

The following screenshot shows the persons table metadata with a description.

The following screenshot shows the table metadata with column descriptions.

Now that you have enriched the technical metadata stored in Data Catalog, you can improve the descriptions by adding external documentation.

Improve metadata descriptions by adding external documentation with RAG

In this step, we add external documentation to generate more accurate metadata. The documentation for our dataset can be found online as an HTML. We use the LangChain HTML community loader to load the HTML content:

from langchain_community.document_loaders import AsyncHtmlLoader

# We will use an HTML Community loader to load the external documentation stored on HTLM
urls = ["http://www.popoloproject.com/specs/person.html", "http://docs.everypolitician.org/data_structure.html",'http://www.popoloproject.com/specs/organization.html','http://www.popoloproject.com/specs/membership.html','http://www.popoloproject.com/specs/area.html']
loader = AsyncHtmlLoader(urls)
docs = loader.load()

After you download the documents, split the documents into chunks:

text_splitter = CharacterTextSplitter(
    separator='\n',
    chunk_size=1000,
    chunk_overlap=200,

)
split_docs = text_splitter.split_documents(docs)

embedding_model = BedrockEmbeddings(
    client=bedrock_client,
    model_id=embeddings_model_id
)

Next, vectorize and store the documents locally and perform a similarity search. For production workloads, you can use a managed service for your vector store such as Amazon OpenSearch Service or a fully managed solution for implementing the RAG architecture such as Amazon Bedrock Knowledge Bases.

vs = FAISS.from_documents(split_docs, embedding_model)
search_results = vs.similarity_search(
    'What standards are used in the dataset?', k=2
)
print(search_results[0].page_content)

Next, include the catalog information along with the documentation to generate more accurate metadata:

from operator import itemgetter
from langchain_core.callbacks import BaseCallbackHandler
from typing import Dict, List, Any


class PromptHandler(BaseCallbackHandler):
    def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
        output = "\n".join(prompts)
        print(output)

system = "You are a helpful assistant. You do not generate any harmful content."
# specify a user message
user_msg_rag = """
Here is the guidance document you should reference when answering the user:

<documentation>{context}</documentation>
I'd like to you create metadata descriptions for the table called {table} in your AWS Glue data catalog. Please follow these steps:

1. Review the data catalog carefully.
2. Use all the data catalog information and the documentation to generate the table description.
3. If a column is a primary key or foreign key to another table mention it in the description.
4. In your response, reply with the entire JSON object for the table {table}
5. Remove the DatabaseName, CreatedBy, IsRegisteredWithLakeFormation, CatalogId,VersionId,IsMultiDialectView,CreateTime, UpdateTime.
6. Write the table description in the Description attribute. Ensure you use any relevant information from the <documentation>
7. List all the table columns under the attribute "StorageDescriptor" and then the attribute Columns. Add Location, InputFormat, and SerdeInfo
8. For each column in the StorageDescriptor, add the attribute "Comment". If a table uses a composite primary key, then the order of a given column in a table’s primary key is listed in parentheses following the column name.
9. Your response must be a valid JSON object.
10. Ensure that the data is accurately represented and properly formatted within the JSON structure. The resulting JSON table should provide a clear, structured overview of the information presented in the original text.
11. If you cannot think of an accurate description of a column, say 'not available'
<glue_data_catalog>
{data_catalog}
</glue_data_catalog>
Here is some additional information about the database in <notes></notes> tags.
<notes>
Typically foreign key columns consist of the name of the table plus the id suffix
<notes>
"""
messages = [
    ("system", system),
    ("user", user_msg_rag),
]
prompt = ChatPromptTemplate.from_messages(messages)

# Retrieve and Generate
retriever = vs.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3},
)

chain = (  
    {"context": itemgetter("table")| retriever, "data_catalog": itemgetter("data_catalog"), "table": itemgetter("table")}
    | prompt
    | model
    | StrOutputParser()
)

TableInputFromLLM = chain.invoke({"data_catalog":glue_data_catalog, "table":table})
print(TableInputFromLLM)

The following is the response from the LLM:

{
  "Name": "persons",
  "Description": "This table contains information about individual persons, including their names, identifiers, contact details, and other personal information. It follows the Popolo data specification for representing persons involved in government and organizations. The 'person_id' column relates a person to an organization through the 'memberships' table.",
  "StorageDescriptor": {
    "Columns": [
      {
        "Name": "family_name",
        "Type": "string",
        "Comment": "The family or last name of the person."
      },
      {
        "Name": "name",
        "Type": "string",
        "Comment": "The full name of the person."
      },
      {
        "Name": "links",
        "Type": "array<struct<note:string,url:string>>",
        "Comment": "An array of links related to the person, with a note and URL for each link."
      },
      {
        "Name": "gender",
        "Type": "string",
        "Comment": "The gender of the person."
      },
      {
        "Name": "image",
        "Type": "string",
        "Comment": "A URL or path to an image representing the person."
      },
      {
        "Name": "identifiers",
        "Type": "array<struct<scheme:string,identifier:string>>",
        "Comment": "An array of identifiers for the person, with a scheme and identifier value for each."
      },
      {
        "Name": "other_names",
        "Type": "array<struct<lang:string,note:string,name:string>>",
        "Comment": "An array of other names the person may be known by, with language, note, and name for each."
      },
      {
        "Name": "sort_name",
        "Type": "string",
        "Comment": "The name to be used for sorting or alphabetical ordering of the person."
      },
      {
        "Name": "images",
        "Type": "array<struct<url:string>>",
        "Comment": "An array of URLs or paths to additional images representing the person."
      },
      {
        "Name": "given_name",
        "Type": "string",
        "Comment": "The given or first name of the person."
      },
      {
        "Name": "birth_date",
        "Type": "string",
        "Comment": "The date of birth of the person."
      },
      {
        "Name": "id",
        "Type": "string",
        "Comment": "The unique identifier for the person. This is likely a primary key."
      },
      {
        "Name": "contact_details",
        "Type": "array<struct<type:string,value:string>>",
        "Comment": "An array of contact details for the person, with a type and value for each."
      },
      {
        "Name": "death_date",
        "Type": "string",
        "Comment": "The date of death of the person, if applicable."
      }
    ],
    "Location": "s3:<your-s3-bucket>/persons/",
    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe"
    }
  }
}

Similar to the first approach, you can validate the output to make sure it conforms to the AWS Glue API.

Update the Data Catalog with new metadata

Now that you have generated the metadata, you can update the Data Catalog:

response = glue_client.update_table(DatabaseName=database, TableInput= json.loads(TableInputFromLLM) )
print(f"Table {table} metadata updated!")

Let’s inspect the technical metadata generated. You should now see a newer version in the Data Catalog for the persons table. You can access schema versions on the AWS Glue console.

Note the persons table description this time. It should differ slightly from the descriptions provided earlier:

  • In-context learning table description – “This table contains information about persons, including their names, identifiers, contact details, birth and death dates, and associated images and links. The ‘id’ column is the primary key for this table.”
  • RAG table description – “This table contains information about individual persons, including their names, identifiers, contact details, and other personal information. It follows the Popolo data specification for representing persons involved in government and organizations. The ‘person_id’ column relates a person to an organization through the ‘memberships’ table.”

The LLM demonstrated knowledge around the Popolo specification, which was part of the documentation provided to the LLM.

Clean up

Now that you have completed the steps described in the post, don’t forget to clean up the resources with the code provided in the notebook so you don’t incur unnecessary costs.

Conclusion

In this post, we explored how you can use generative AI, specifically Amazon Bedrock FMs, to enrich the Data Catalog with dynamic metadata to improve the discoverability and understanding of existing data assets. The two approaches we demonstrated, in-context learning and RAG, showcase the flexibility and versatility of this solution. In-context learning works well for AWS Glue databases with a small number of tables, whereas the RAG approach uses external documentation to generate more accurate and detailed metadata, making it suitable for larger and more complex data landscapes. By implementing this solution, you can unlock new levels of data intelligence, empowering your organization to make more informed decisions, drive data-driven innovation, and unlock the full value of your data. We encourage you to explore the resources and recommendations provided in this post to further enhance your data management practices.


About the Authors

Manos Samatas is a Principal Solutions Architect in Data and AI with Amazon Web Services. He works with government, non-profit, education and healthcare customers in the UK on data and AI projects, helping build solutions using AWS. Manos lives and works in London. In his spare time, he enjoys reading, watching sports, playing video games and socialising with friends.

Anastasia Tzeveleka is a Senior GenAI/ML Specialist Solutions Architect at AWS. As part of her work, she helps customers across EMEA build foundation models and create scalable generative AI and machine learning solutions using AWS services.

How FINRA established real-time operational observability for Amazon EMR big data workloads on Amazon EC2 with Prometheus and Grafana

Post Syndicated from Sumalatha Bachu original https://aws.amazon.com/blogs/big-data/how-finra-established-real-time-operational-observability-for-amazon-emr-big-data-workloads-on-amazon-ec2-with-prometheus-and-grafana/

This is a guest post by FINRA (Financial Industry Regulatory Authority). FINRA is dedicated to protecting investors and safeguarding market integrity in a manner that facilitates vibrant capital markets.

FINRA performs big data processing with large volumes of data and workloads with varying instance sizes and types on Amazon EMR. Amazon EMR is a cloud-based big data environment designed to process large amounts of data using open source tools such as Hadoop, Spark, HBase, Flink, Hudi, and Presto.

Monitoring EMR clusters is essential for detecting critical issues with applications, infrastructure, or data in real time. A well-tuned monitoring system helps quickly identify root causes, automate bug fixes, minimize manual actions, and increase productivity. Additionally, observing cluster performance and usage over time helps operations and engineering teams find potential performance bottlenecks and optimization opportunities to scale their clusters, thereby reducing manual actions and improving compliance with service level agreements.

In this post, we talk about our challenges and show how we built an observability framework to provide operational metrics insights for big data processing workloads on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) clusters.

Challenge

In today’s data-driven world, organizations strive to extract valuable insights from large amounts of data. The challenge we faced was finding an efficient way to monitor and observe big data workloads on Amazon EMR due to its complexity. Monitoring and observability for Amazon EMR solutions come with various challenges:

  • Complexity and scale – EMR clusters often process massive volumes of data across numerous nodes. Monitoring such a complex, distributed system requires handling high data throughput and achieving minimal performance impact. Managing and interpreting the large volume of monitoring data generated by EMR clusters can be overwhelming, making it difficult to identify and troubleshoot issues in a timely manner.
  • Dynamic environments – EMR clusters are often ephemeral, created and shut down based on workload demands. This dynamism makes it challenging to consistently monitor, collect metrics, and maintain observability over time.
  • Data variety – Monitoring cluster health and having visibility into clusters to detect bottlenecks, unexpected behavior during processing, data skew, job performance, and so on are crucial. Detailed observability into long-running clusters, nodes, tasks, potential data skews, stuck tasks, performance issues, and job-level metrics (like Spark and JVM) is very critical to understand. Achieving comprehensive observability across these varied data types was difficult.
  • Resource utilization – EMR clusters consist of various components and services working together, making it challenging to effectively monitor all aspects of the system. Monitoring resource utilization (CPU, memory, disk I/O) across multiple nodes to prevent bottlenecks and inefficiencies is essential but complex, especially in a distributed environment.
  • Latency and performance metrics –Capturing and analyzing latency and comprehensive performance metrics in real time to identify and resolve issues promptly is critical, but it’s challenging due to the distributed nature of Amazon EMR.
  • Centralized observability dashboards – Having a single pane of glass for all aspects of EMR cluster metrics, including cluster health, resource utilization, job execution, logs, and security, in order to provide a complete picture of the system’s performance and health, was a challenge.
  • Alerting and incident management – Setting up effective centralized alerting and notification systems was challenging. Configuring alerts for critical events or performance thresholds requires careful consideration to avoid alert fatigue while making sure important issues are addressed promptly. Responding to incidents from performance slowdowns or disruptions takes time and effort to detect and remediate the issues if proper alerting mechanism is not in place.
  • Cost management – Lastly, optimizing costs while maintaining effective monitoring is an ongoing challenge. Balancing the need for comprehensive monitoring with cost constraints requires careful planning and optimization strategies to avoid unnecessary expenses while still providing adequate monitoring coverage.

Effective observability for Amazon EMR requires a combination of the right tools, practices, and strategies to address these challenges and provide reliable, efficient, and cost-effective big data processing.

The Ganglia system on Amazon EMR is designed to monitor complete cluster and all nodes’ health, which shows several metrics like Hadoop, Spark, and JVM. When we view the Ganglia web UI in a browser, we see an overview of the EMR cluster’s performance, detailing the load, memory usage, CPU utilization, and network traffic of the cluster through different graphs. However, with Ganglia’s deprecation announced by AWS for higher versions of Amazon EMR, it became important for FINRA to build this solution.

Solution overview

Insights drawn from the post Monitor and Optimize Analytic Workloads on Amazon EMR with Prometheus and Grafana inspired our approach. The post demonstrated how to set up a monitoring system using Amazon Managed Service for Prometheus and Amazon Managed Grafana to effectively monitor an EMR cluster and use Grafana dashboards to view metrics to troubleshoot and optimize performance issues.

Based on these insights, we completed a successful proof of concept. Next, we built our enterprise central monitoring solution with Managed Prometheus and Managed Grafana to mimic Ganglia-like metrics at FINRA. Managed Prometheus allows for real-time high-volume data collection, which scales the ingestion, storage, and querying of operational metrics as workloads increase or decrease. These metrics are fed to the Managed Grafana workspace for visualizations.

Our solution includes a data ingestion layer for every cluster, with configuration for metrics collection through a custom-built script stored in Amazon Simple Storage Service (Amazon S3). We also installed Managed Prometheus at startup for EC2 instances on Amazon EMR through a bootstrap script. Additionally, application-specific tags are defined in the configuration file to optimize inclusion and collect the specific metrics.

After Managed Prometheus (installed on EMR clusters) collects the metrics, they are sent to a remote Managed Prometheus workspace. Managed Prometheus workspaces are logical and isolated environments dedicated to Managed Prometheus servers that manage specific metrics. They also provide access control for authorizing who or what sends and receives metrics from that workspace. You can create one more workspace by account or application depending on the need, which facilitates better management.

After metrics are collected, we built a mechanism to render them on Managed Grafana dashboards that are then used for consumption through an endpoint. We customized the dashboards for task-level, node-level, and cluster-level metrics so they can be promoted from lower environments to higher environments. We also built several templated dashboards that display node-level metrics like OS-level metrics (CPU, memory, network, disk I/O), HDFS metrics, YARN metrics, Spark metrics, and job-level metrics (Spark and JVM), maximizing the potential for each environment through automated metric aggregation in each account.

We chose a SAML-based authentication option, which allowed us to integrate with existing Active Directory (AD) groups, helping minimize the work needed to manage user access and grant user-based Grafana dashboard access. We arranged three main groups—admins, editors, and viewers—for Grafana user authentication based on user roles.

Through elaborate monitoring automation, these desired metrics are pushed to Amazon CloudWatch. We use CloudWatch for necessary alerting when it exceeds the desired thresholds for each metric.

The following diagram illustrates the solution architecture.

Sample dashboards

The following screenshots showcase example dashboards.

Conclusion

In this post, we shared how FINRA enhanced data-driven decision-making with comprehensive EMR workload observability to optimize performance, maintain reliability, and gain critical insights into big data operations, leading to operational excellence.

FINRA’s solution enabled the operations and engineering teams to use a single pane of glass for monitoring big data workloads and quickly detecting any operational issues. The scalable solution significantly reduced time to resolution and enhanced our overall operational stance. The solution empowered the operations and engineering teams with comprehensive insights into various Amazon EMR metrics like OS levels, Spark, JMX, HDFS, and Yarn, all consolidated in one place. We also extended the solution to use cases such as Amazon Elastic Kubernetes Service (Amazon EKS) clusters, including EMR on EKS clusters and other applications, establishing it as a one-stop system for monitoring metrics across our infrastructure and applications.


About the Authors

Sumalatha Bachu is Senior Director, Technology at FINRA. She manages Big Data Operations which includes managing petabyte-scale data and complex workloads processing in cloud. Additionally, she is an expert in developing Enterprise Application Monitoring and Observability Solutions, Operational Data Analytics, & Machine Learning Model Governance work flows. Outside of work, she enjoys doing yoga, practicing singing, and teaching in her free time.

PremKiran Bejjam is Lead Engineer Consultant at FINRA, specializing in developing resilient and scalable systems. With a keen focus on designing monitoring solutions to enhance infrastructure reliability, he is dedicated to optimizing system performance. Beyond work, he enjoys quality family time and continually seeks out new learning opportunities.

Akhil Chalamalasetty is Director, Market Regulation Technology at FINRA. He is a Big Data subject matter expert specializing in building cutting edge solutions at scale along with optimizing workloads, data, and its processing capabilities. Akhil enjoys sim racing and Formula 1 in his free time.

Streamlining AWS Glue Studio visual jobs: Building an integrated CI/CD pipeline for seamless environment synchronization

Post Syndicated from Andrei Maksimov original https://aws.amazon.com/blogs/big-data/streamlining-aws-glue-studio-visual-jobs-building-an-integrated-ci-cd-pipeline-for-seamless-environment-synchronization/

Many Amazon Web Services (AWS) customers have integrated their data across multiple sources using AWS Glue, a serverless data integration service. By providing seamless integration throughout the development lifecycle, AWS Glue enables organizations to make data-driven business decisions.

AWS Glue Studio visual jobs provide a graphical interface called the visual editor that you can use to author extract, transform, and load (ETL) jobs in AWS Glue visually. The visual editor maintains a visual representation that a variety of data sources, transformations, and data sinks. With its intuitive interface, you can easily create large-scale data integration jobs without needing coding expertise, simplifying workflows and eliminating the need for manual ETL script programming.

As data engineers increasingly rely on the AWS Glue Studio visual editor to create data integration jobs, the need for a streamlined development lifecycle and seamless synchronization between environments has become paramount. Additionally, managing versions of visual directed acyclic graphs (DAGs) is crucial for tracking changes, collaboration, and maintaining consistency across environments.

This post introduces an end-to-end solution that addresses these needs by combining the power of the AWS Glue Visual Job API, a custom AWS Glue Resource Sync Utility, and an based continuous integration and continuous deployment (CI/CD) pipeline.

A few common questions from our customers include:

  • What are the best practices for moving our workloads from a pre-production environment to production?
  • What are the recommended best practices for provisioning data integration components?
  • How can I build AWS Glue visual jobs in the development environment and automatically propagate them to the production account using the CI/CD pipeline?
  • How can I version control and track changes to my AWS Glue Studio visual jobs?

End-to-end development lifecycle for data integration pipeline

The software development lifecycle on AWS has six phases: plan, design, implement, test, deploy, and maintain, as shown in the following diagram.

SDLC

For more information regarding each component, check out End-to-end development lifecycle for data engineers to build a data integration pipeline using AWS Glue.

AWS Glue Resource Sync Utility

As part of synchronizing AWS Glue visual jobs across different environments, requirements include:

  • Manage version control of visual DAGs by tracking changes to AWS Glue Studio visual jobs using version control systems such as Git
  • Promote AWS Glue visual jobs from a pre-production environment to a production environment
  • Transfer ownership of AWS Glue visual jobs between different AWS accounts
  • Replicate AWS Glue visual jobs from one AWS Region to another as part of a disaster recovery strategy

The AWS Glue Resource Sync Utility is a Python application developed on top of the AWS Glue Visual Job API, designed to synchronize AWS Glue Studio visual jobs across different accounts without losing the visual representation. It operates by using source and target AWS environment profiles. Optionally, a list of jobs for synchronization can be provided along with a mapping file to replace environment-specific resources.

For more information on the AWS Glue Resource Sync Utility, refer to Synchronize your AWS Glue Studio Visual Jobs to different environments.

Solution overview

As shown in the following diagram, this solution uses three separate AWS accounts. One account is designated for the development environment, another for the production environment, and a third to host the CI/CD infrastructure and pipeline.

Solution Overview

The solution emphasizes version controlling AWS Glue Studio visual jobs by serializing them into JSON files and storing them in a Git repository. As a result, you can:

  • Track changes to your visual DAGs over time.
  • Collaborate with team members.
  • Restore and deploy visual DAGs in different environments seamlessly.

The AWS account responsible for hosting the CI/CD pipeline is composed of three key components:

  • Managing AWS Glue Job updates – Provides smooth updates and maintenance of AWS Glue jobs.
  • Cross-Account Access Management – Enables secure promotion of updates from the development environment to the production environment.
  • Version Control Integration – Incorporates serialized visual DAGs into the CI/CD pipeline for deployment to target environments.

You can create AWS Glue Studio visual jobs using the intuitive visual editor in your development account. After these jobs are configured, they can serialize the visual DAGs into JSON files and commit them to a Git repository. The CI/CD pipeline detects changes to the repository and automatically triggers the deployment process.

The pipeline includes a step where the AWS Glue Resource Sync Utility deserializes the visual DAGs from the committed JSON files and deploys them to the production environment. This approach promotes consistent deployment of jobs while maintaining their visual representation.

The solution uses the AWS Glue Visual Job API, AWS Glue Resource Sync Utility, and AWS CDK to streamline deployment across environments. It enables seamless synchronization and consistent versioning of AWS Glue jobs between development and production, preserving visual workflows and reducing manual tasks. The solution consists of two main parts:

  • Initial steps (one-time setup) – Setting up the development environment, bootstrapping AWS environments, deploying the CI/CD pipeline, and integrating the AWS Glue Resource Sync Utility
  • Day-to-day development (repeated) – Ongoing activities such as creating visual jobs, serializing them, committing changes to the repository, deploying to production through the pipeline, and verifying the jobs

The solution follows these high-level steps for the initial setup:

  1. Set up the development environment
  2. Bootstrap your AWS environments
  3. Deploy the CI/CD pipeline
  4. Configure AWS developer tools connection on GitHub
  5. Integrate the CI/CD pipeline with the AWS Glue Resource Sync Utility

The solution follows these high-level steps for the day-to-day development:

  1. Create visual jobs in the development account
  2. Serialize visual jobs
  3. Commit changes to Git repository
  4. Deploy visual jobs to production
  5. Verify visual jobs in production

Prerequisites

Before you begin, make sure you have the following:

  • GitHub account
  • Git (git command)
  • Python 3.9 or later
  • Package installer for Python (pip command)
  • AWS CDK Toolkit (cdk command) 2.155.0 or later
  • AWS CLI configured with appropriate credentials for your accounts
  • Three AWS accounts:
    • Development account
    • Production account
    • Pipeline account (for hosting the CI/CD pipeline)

Technical solution walkthrough

This section provides a detailed guide to setting up and using an automated CI/CD pipeline for AWS Glue Studio visual jobs.

Initial steps (one-time setup)

In this section, we walk through the foundational steps required to establish the CI/CD pipeline for AWS Glue Studio visual jobs. These initial steps set up the necessary infrastructure and configurations, providing a smooth and automated deployment process across your development and production environments.

Set up the development environment

To set up the development environment, follow these steps:

  1. Fork the aws-glue-cdk-baseline repository
  2. Clone the forked repository:
git clone https://github.com/<YOUR-GITHUB-USERNAME>/aws-glue-cdk-baseline.git

cd aws-glue-cdk-baseline
  1. Create and activate a Python virtual environment:
python3 -m venv .venv

# On Windows, use .venv\\Scripts\\activate.bat
source .venv/bin/activate
  1. Install required dependencies:
pip install -r requirements.txt

pip install -r requirements-dev.txt
  1. To configure the default settings, edit the default-config.yaml file with your AWS account details and replace placeholders with your AWS account details:
  2. Pipeline account: awsAccountId and awsRegion.
  3. Development account: awsAccountId and awsRegion.
  4. Production account: awsAccountId and awsRegion.

Bootstrap your AWS environments

Bootstrapping prepares your AWS accounts for AWS CDK deployments. To bootstrap your AWS environments, run the following commands, replacing placeholders with your account numbers, Regions, and AWS CLI profiles:

# Bootstrap the pipeline account
cdk bootstrap aws://<PIPELINE-ACCOUNT-NUMBER>/<REGION> --profile <PIPELINE-PROFILE>

# Bootstrap the development account, trusting the pipeline account
cdk bootstrap aws://<DEV-ACCOUNT-NUMBER>/<REGION> --profile <DEV-PROFILE> --trust <PIPELINE-ACCOUNT-NUMBER>

# Bootstrap the production account, trusting the pipeline account
cdk bootstrap aws://<PROD-ACCOUNT-NUMBER>/<REGION> --profile <PROD-PROFILE> --trust <PIPELINE-ACCOUNT-NUMBER>

Deploy the CI/CD pipeline

Deploy the pipeline stack to your pipeline account:

cdk deploy --profile <PIPELINE-PROFILE>

This command creates:

  • The pipeline stack in the pipeline account
  • The AWS Glue app stack in the development account

Configure AWS developer tools connection to GitHub

To establish a connection between AWS CodePipeline and your GitHub repository, follow these steps:

  1. Create a GitHub connection
  2. In the AWS Management Console for your pipeline account, navigate to AWS CodePipeline
  3. In the navigation pane, choose Connections
  4. Choose Create connection
  5. Select GitHub as the source provider
  6. Authorize the connection
  7. Provide a connection name (such as MyGitHubConnection)
  8. Choose Connect to GitHub
  9. Follow the prompts to authorize AWS CodePipeline to access your GitHub account
  10. Make sure that the connection has access to your forked aws-glue-cdk-baseline repository
  11. Note the connection Amazon Resource Name (ARN)
  12. After the connection is established, note the Connection ARN because you’ll need it when configuring the pipeline

Integrate the CI/CD pipeline with the AWS Glue Resource Sync Utility

To integrate the AWS Glue Resource Sync Utility into the pipeline to automate the synchronization of AWS Glue visual jobs, follow these steps:

  1. Download the sync.py script from the AWS Glue Samples repository:
wget https://raw.githubusercontent.com/aws-samples/aws-glue-samples/master/utilities/resource_sync/sync.py \
-O aws_glue_cdk_baseline/job_scripts/sync.py
  1. Create a new file aws_glue_cdk_baseline/job_scripts/generate_mapping.py with the following content:
import yaml
import json
 
def generate_mapping():
    with open('default-config.yaml', 'r') as config_file:
        config = yaml.safe_load(config_file)
    mapping = {
        f"s3://aws-glue-assets-{config['devAccount']['awsAccountId']}-{config['devAccount']['awsRegion']}": f"s3://aws-glue-assets-{config['prodAccount']['awsAccountId']}-{config['prodAccount']['awsRegion']}",
        f"arn:aws:iam::{config['devAccount']['awsAccountId']}:role/service-role/AWSGlueServiceRole": f"arn:aws:iam::{config['prodAccount']['awsAccountId']}:role/service-role/AWSGlueServiceRole",
        f"s3://dev-glue-data-{config['devAccount']['awsAccountId']}-{config['prodAccount']['awsRegion']}": f"s3://prod-glue-data-{config['prodAccount']['awsAccountId']}-{config['prodAccount']['awsRegion']}"
    }
    with open('mapping.json', 'w') as mapping_file:
        json.dump(mapping, mapping_file, indent=2)
 
if __name__ == "__main__":
    generate_mapping()

This script generates a mapping.json file that the sync.py script will use to synchronize the jobs between the development and production environments. The mapping.json file contains the mapping of the development environment assets to the production environment assets:

  • The s3://aws-glue-assets-* Amazon Simple Storage Service (Amazon S3) bucket contains the AWS Glue Studio visual job definitions
  • The arn:aws:iam::*:role/service-role/AWSGlueServiceRole AWS Identity and Access Management (IAM) role is used by the AWS Glue Studio jobs to access AWS resources
  • The s3://dev-glue-data-* and s3://prod-glue-data-* S3 buckets contain scripts and data used by the AWS Glue Studio jobs
  1. Update the aws_glue_cdk_baseline/pipeline_stack.py file to include a step that deserializes the JSON file and deploys the AWS Glue jobs to the production environment:
from typing import Dict
import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_iam as iam
)
from constructs import Construct
from aws_cdk.pipelines import CodePipeline, CodePipelineSource, CodeBuildStep
from aws_glue_cdk_baseline.glue_app_stage import GlueAppStage
 
GITHUB_REPO = "YOUR-GITHUB-USERNAME/aws-glue-cdk-baseline"
GITHUB_BRANCH = "main"
GITHUB_CONNECTION_ARN = "YOUR-GITHUB-CONNECTION-ARN"
 
class PipelineStack(Stack):
 
    def __init__(self, scope: Construct, construct_id: str, config: Dict, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)
 
        source = CodePipelineSource.connection(
            GITHUB_REPO,
            GITHUB_BRANCH,
            connection_arn=GITHUB_CONNECTION_ARN
        )
 
        pipeline = CodePipeline(self, "GluePipeline",
            pipeline_name="GluePipeline",
            cross_account_keys=True,
            docker_enabled_for_synth=True,
            synth=CodeBuildStep("CdkSynth",
                input=source,
                install_commands=[
                    "pip install -r requirements.txt",
                    "pip install -r requirements-dev.txt",
                    "npm install -g aws-cdk",
                ],
                commands=[
                    "cdk synth",
                ]
            )
        )
 
        # Add development stage
        dev_stage = GlueAppStage(self, "DevStage", config=config, stage="dev", 
            env=cdk.Environment(
                account=str(config['devAccount']['awsAccountId']),
                region=config['devAccount']['awsRegion']
            ))
        pipeline.add_stage(dev_stage)

        # Add production stage
        prod_stage = GlueAppStage(self, "ProdStage", config=config, stage="prod", 
            env=cdk.Environment(
                account=str(config['prodAccount']['awsAccountId']),
                region=config['prodAccount']['awsRegion']
            ))
        pipeline.add_stage(prod_stage)
 
        # Glue Resource Sync as a separate step in the pipeline
        pipeline.add_wave("GlueJobSync").add_post(CodeBuildStep("GlueJobSync",
            input=source,
            commands=[
                "python $(pwd)/aws_glue_cdk_baseline/job_scripts/generate_mapping.py",
                "python aws_glue_cdk_baseline/job_scripts/sync.py "
                   "--dst-role-arn arn:aws:iam::{0}:role/GlueCrossAccountRole-prod "
                   "--dst-region {1} "
                   "--deserialize-from-file aws_glue_cdk_baseline/resources/resources.json "
                   "--config-path mapping.json "
                   "--targets job,catalog "
                   "--skip-prompt".format(
                       config['prodAccount']['awsAccountId'],
                       config['prodAccount']['awsRegion']
                   ),
            ],
            role_policy_statements=[
                iam.PolicyStatement(
                    actions=[
                        "sts:AssumeRole",
                    ],
                    resources=["*"]
                )
            ]
        ))

Replace the placeholders in the pipeline_stack.py file with your values:

  • GITHUB_REPO with the name of your GitHub repository
  • GITHUB_BRANCH with the name of the branch you want to use for the pipeline
  • GITHUB_CONNECTION_ARN with the ARN of the GitHub connection you created in Step 4
  1. Update the aws_glue_cdk_baseline/glue_app_stack.py file to create a cross-account role with the necessary permissions to access the development environment resources:
    self.cross_account_role = self.create_cross_account_role(
        f"GlueCrossAccountRole-{stage}",
        str(config['pipelineAccount']['awsAccountId'])
    )
 
    def create_cross_account_role(self, role_name: str, trusted_account_id: str):
        return iam.Role(self, f"{role_name}CrossAccountRole",
            role_name=role_name,
            assumed_by=iam.AccountPrincipal(trusted_account_id),
            managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name("AdministratorAccess")]
        )
 
    @property
    def cross_account_role_arn(self):
        return self.cross_account_role.role_arn

    @property
    def cross_account_role_arn(self):
        return self.glue_app_stack.cross_account_role_arn

Check the andreimaksimov/aws-glue-cdk-baseline for a complete diff.

  1. Commit your changes to the repository:
git add aws_glue_cdk_baseline/job_scripts/sync.py
git add aws_glue_cdk_baseline/job_scripts/generate_mapping.py
git add pipeline_stack.py

git commit -m "Integrate Glue Resource Sync Utility into the pipeline"

git push

Day-to-day development (repeated)

With the initial setup complete, you can now proceed with your regular development activities. This section outlines the steps you’ll repeat during your day-to-day work to develop, version control, and deploy AWS Glue visual jobs.

Create visual jobs in the development account

In this step, you’ll use AWS Glue Studio to create and configure your visual jobs within the development environment.

  1. In your development account, in AWS Glue Studio, select AWS Glue Studio
  2. To create a new visual job, choose Create job
  3. Choose Visual with a blank canvas and use the visual editor to design your ETL job
  4. Configure the job settings:
  5. Job name: Provide a meaningful name
  6. IAM role: Select an IAM role with necessary permissions
  7. Other configurations: Adjust as needed
  8. To save the job, choose Save

Repeat these steps to create additional jobs as required.

Serialize visual jobs

To serialize your visual jobs to enable version control and preparation for deployment, follow these steps:

  1. Run the AWS Glue Resource Sync Utility:
python sync.py \
  --src-role-arn arn:aws:iam::<DEV-ACCOUNT-NUMBER>:role/GlueCrossAccountRole-dev \
  --src-region us-east-1 \
  --serialize-to-file resources.json \
  --targets job,catalog \
  --skip-prompt
  1. Replace <DEV-ACCOUNT-NUMBER> with your development account number
  2. Replace <DEV-REGION> with your development Region (for example, us-east-1)
  3. Verify the serialized file:
  4. Locate JSON in aws_glue_cdk_baseline/resources/
  5. Make sure it contains the definitions of your visual jobs

Commit changes to Git repository

To commit changes to the Git repository, follow these steps:

  1. Add the serialized resources to Git:
git add aws_glue_cdk_baseline/resources/resources.json
  1. Commit your changes:
git commit -m "Add serialized Glue Visual Jobs"
  1. Push to GitHub:
git push

This action triggers the CI/CD pipeline.

Deploy visual jobs to production

The CI/CD pipeline automatically deploys the following changes:

  • Synthesize the AWS CDK application
  • Deploy to the development environment
  • Deploy to the production environment
  • Execute the AWS Glue Resource Sync Utility

The following screenshot shows the CI/CD pipeline.

CICD Pipeline

Verify visual jobs in production

After the pipeline has completed the deployment, it’s important to verify that the visual jobs are correctly reflected in the production environment. To do so, follow these steps:

  1. In the production account, on the AWS Glue Studio console, select AWS Glue Studio
  2. Verify the deployed jobs:
  3. Make sure that the visual jobs are present
  4. Open each job to confirm that the visual DAGs are preserved

By following these steps in your day-to-day workflow, you make sure that your AWS Glue visual jobs are version-controlled, consistent across environments, and that your production environment reflects the latest tested changes.

Version control for AWS Glue visual jobs

By serializing AWS Glue Studio visual jobs to JSON files and committing them to a Git repository, you enable version control for your data integration workflows. By following this approach you can:

  • Track Changes – Monitor modifications to your AWS Glue jobs over time
  • Collaborate – Work with team members on developing and refining jobs
  • Restore and deploy – Easily restore jobs in other accounts or environments

The serialization and deserialization steps are integral to your development and deployment process, making sure that all changes are captured and seamlessly propagated.

Conclusion

By combining the AWS Glue Visual Job API, AWS Glue Resource Sync Utility, and an AWS CDK based CI/CD pipeline, we’ve crafted a comprehensive solution for managing AWS Glue Studio visual jobs across different environments. This integrated approach offers several benefits:

  • Version control integration – Manage and track changes to your AWS Glue visual jobs using Git, enabling collaboration and change tracking
  • Streamlined development – Easily develop and test AWS Glue jobs using the Visual Editor in the development environment
  • Automated deployment – Use a CI/CD pipeline to automatically deploy serialized visual DAGs to the production environment
  • Environment consistency – Promote consistency across development and production environments by using the same job definitions
  • Visual representation preservation – Maintain the visual DAG representation when synchronizing jobs between environments

This solution empowers data engineers to focus on building robust data integration pipelines while automating the complexities of managing and deploying AWS Glue Studio visual jobs across multiple environments.

We encourage you to try this solution and adapt it to your needs. As always, we welcome your feedback and suggestions for further improvements.


About the Authors

Andrei MaksimovAndrei Maksimov is an AWS Senior Cloud Infrastructure Architect specializing in cloud infrastructure, software development, and DevOps. He designs and implements scalable, secure, and efficient cloud solutions and helps customers optimize their cloud environments. Outside of work, Andrei enjoys participating in hackathons, contributing to open source projects, and exploring the latest advancements in AI. You can connect with him on LinkedIn.

David ZhangDavid Zhang is an AWS Data Architect specializing in designing and implementing analytics infrastructure, data management, ETL, and extensive data systems. He helps customers modernize their AWS data platforms. David is also an active speaker at AWS conferences and contributor to AWS conferences, technical content, and open source initiatives. He enjoys playing volleyball, tennis, and weightlifting in his free time. Feel free to connect with him on LinkedIn.

Noritaka SekiyamaNoritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for designing AWS features, implementing software artifacts, and helping with customer architectures. In his spare time, he enjoys watching anime on Prime Video. You can connect with him on LinkedIn.

Accelerate SQL code migration from Google BigQuery to Amazon Redshift using BladeBridge

Post Syndicated from Ritesh Sinha original https://aws.amazon.com/blogs/big-data/accelerate-sql-code-migration-from-google-bigquery-to-amazon-redshift-using-bladebridge/

Accelerating SQL code migration from Google BigQuery to Amazon Redshift can be a complex and time-consuming task. Businesses often struggle to efficiently translate their existing BigQuery code to Amazon Redshift, which can delay critical data modernization initiatives. However, with the right tools and approach, this migration process can be significantly streamlined.

This post explores how you can use BladeBridge, a leading data environment modernization solution, to simplify and accelerate the migration of SQL code from BigQuery to Amazon Redshift. BladeBridge offers a comprehensive suite of tools that automate much of the complex conversion work, allowing organizations to quickly and reliably transition their data analytics capabilities to the scalable Amazon Redshift data warehouse. BladeBridge provides a configurable framework to seamlessly convert legacy metadata and code into more modern services such as Amazon Redshift.

Amazon Redshift is a fully managed data warehouse service offered by Amazon Web Services (AWS). Tens of thousands of customers use Amazon Redshift every day to run analytics, processing exabytes of data for business insights. Whether your growing data is stored in operational data stores, data lakes, streaming data services, or third-party datasets, Amazon Redshift helps you securely access, combine, and share data with minimal movement or copying. Amazon Redshift is built for scale and delivers up to 7.9 times better price performance than other cloud data warehouses.

By using the BladeBridge Analyzer and BladeBridge Converter tools, organizations can significantly reduce the time and effort required to migrate BigQuery code to Amazon Redshift. The Analyzer provides detailed assessments of the complexity and requirements for the migration, and the Converter automates the actual code conversion process, using pattern-based customizable rules to streamline the transition.

In this post, we walk through the step-by-step process of using BladeBridge to accelerate the migration of BigQuery SQL code to Amazon Redshift.

Solution overview

The BladeBridge solution is composed of two key components: the BladeBridge Analyzer and the BladeBridge Converter.

BladeBridge Analyzer

The Analyzer is designed to thoroughly assess the complexities of the existing data environment, in this case, Google BigQuery. After assessment of the source SQL files, it generates a comprehensive report that provides valuable insights into the migration effort. The Analyzer report includes the following:

  • Summary of the total number of SQL scripts, file scripts, data definition language (DDL) statements, and other key metrics
  • Categorization of the SQL code complexity into levels such as low, medium, complex, and very complex
  • Insights that help both the organizations and systems integrators prepare more accurate project estimates and migration plans

BladeBridge Converter

The Converter is a pattern-based automation tool that streamlines the actual code conversion process from BigQuery to Amazon Redshift. The Converter uses a set of predefined conversion rules and patterns to automatically translate 70–95% of the legacy SQL code. This significantly reduces the manual effort required by developers. The Converter works by doing the following:

  • Parsing the source SQL files and analyzing the code semantically
  • Applying the appropriate translation rules and patterns to convert source database code to the target, in this case, Google BigQuery to Amazon Redshift

The out-of-the-box code handles most conversions. The Converter allows developers to customize the conversion patterns for more complex transformations.

The following is the migration procedure:

  1. Prepare SQL files
  2. Using BladeBridge Analyzer, create an analyzer report
  3. Purchase license keys for converter
  4. Using BladeBridge Converter, convert SQL files

The following diagram illustrates these steps.

Prerequisites

You need the following prerequisites to implement the solution:

  • An AWS account
  • An Amazon Redshift provisioned cluster or Amazon Redshift serverless workgroup
  • An Amazon Elastic Compute Cloud (Amazon EC2) instance, on-premises server, or desktop or laptop with the following requirements:
    • MacOS, Windows 7 or higher with 32-bit or 64-bit, Linux Redhat, Ubuntu, or similar operating system
    • A minimum of 8 GB RAM is recommended
  • Visit the BladeBridge community portal and sign up to create your account. The portal gives you access to a comprehensive suite of resources, including the BladeBridge Analyzer, Converter, and other training materials. This post contains some links that are only accessible to registered members of the BladeBridge community portal.
  • Contact BladeBridge through Request demo and obtain an Analyzer key for your organization.

Solution walkthrough

Follow these solution steps:

Prepare SQL files

For SQL data warehouses such as BigQuery, code preparation starts by exporting the SQL files out of the data warehouse solution. If your BigQuery SQL code is stored in a single file containing multiple database objects, you need to split them into individual files before using the BladeBridge tools to convert the code to Amazon Redshift. To split into multiple files, you can use the BladeBridge SQL File Splitter utility. The BladeBridge conversion process is optimized to work with each database object (for example, tables, views, and materialized views) and code object (for example, stored procedures and functions) stored in its own separate SQL file. This allows the BladeBridge Analyzer to scan each file individually, gaining a comprehensive understanding of the code patterns, complexity, and structure. To use BladeBridge SQL File Splitter utility, follow these steps:

  1. Log in to BladeBridge portal and download the SQL file splitter utility for your operating system.
  2. Create an input file directory and place your BigQuery SQL code files in the directory.
  3. Create an empty output file directory. The files generated by the splitter will be stored here.
  4. Navigate to the directory where you downloaded the bbsqlsplit executable file and run the following command in your terminal (Mac or Linux) or command prompt (Windows), replacing the input and output file directory paths:

Syntax

bbsqlsplit

######## OPTIONS ########

-d <<input file directory path>>

-o <<output file directory path>>

-E sql

Example 

bbsqlsplit

-d C:\Users\XXXXX\Desktop\BladeBridge\SplitFilesUtility\source

-o C:\Users\XXXXX\Desktop\BladeBridge\SplitFilesUtility\splitFiles

-E sql

For more options of the bbsqlsplit command, refer to the SQL file split documentation in the BladeBridge community portal.

Using BladeBridge Analyzer, create an analyzer report

The Analyzer provides a detailed assessment of the existing BigQuery code, generating a comprehensive report that outlines the complexity and requirements for the migration to Amazon Redshift.

To run the BladeBridge Analyzer, follow these steps:

  1. Log in to the BladeBridge portal and navigate to the Analyzer Download
  2. Download the Analyzer executable file for your operating system (for example, bbanalyzer.exe for Windows, bbanalyzer.gz for Linux and macOS). For macOS and Linux users, you need to deflate the downloaded gzip file.
  3. Download the configuration file (general_sql_specs.json) from the BladeBridge community portal, as shown in the following screenshot.

  1. On the BladeBridge community portal, choose Assets. This page should display the Analyzer key for your organization.

  1. From the assets page, download the Analyzer key as shown in the following screenshot.

In the directory where you downloaded the bbanalyzer executable file, run the following command in your terminal (Mac or Linux) or command prompt (Windows), replacing the necessary paths.

Syntax

bbanalyzer

######## OPTIONS ########

-c <<path to your analyzer key>>

-t SQL

-d <<path to your source code directory>>

-r <<name for the output report>>.xlsx

Example

bbanalyzer

-c C:\Users\XXXXX\Desktop\BladeBridge\analyzer_key.txt

-t SQL

-d C:\Users\XXXXX\Desktop\BladeBridge\SplitFilesUtility\splitFiles

-r analyzer_report.xlsx

After running the command successfully, the Analyzer generates a report. Review the report thoroughly, because it provides a summary and in-depth explanations of the SQL analysis. The summary sheet, shown in the following image, provides an overview of the migration, including the number of total SQL scripts, file scripts, and DDLs. Each SQL script is categorized into LOW, MEDIUM, COMPLEX, or VERY_COMPLEX complexities, which are determined by the Analyzer Complexity Determination Algorithm. The summary will also help with understanding the overall complexity and migration effort before performing the actual conversion.

If you observe an error when running BladeBridge Analyzer, review following troubleshooting tips:

  • Configure the write permission – You may need to add necessary permission to the analyzer executable file. For Mac and Linux users, run chmod 755 ./bbanalyzer to modify the permission.
  • Allow running third party software – Because BladeBridge Analyzer is a third-party software, MacOS may raise a warning or an error when running Analyzer. If you’re using Mac, follow the instructions in Open a Mac app from an unidentified developer.
  • Use local drive – In some cases, you might encounter an error if the executable is located in a network drive. We recommend that you run the executable on the local drive.
  • Don’t include whitespace in the path – Make sure the path to the executable doesn’t contain a directory with spaces in the directory name.

For more details, refer to the BladeBridge Analyzer Demo.

Purchase license keys for convertor

To use the BladeBridge Converter and automate the code translation process, you need to purchase the necessary license keys. These license keys are tied to the specific SQL files you are converting, making sure that updates to the source code require the appropriate license.

To obtain the license keys, follow these steps:

  1. Share the output of the BladeBridge Analyzer report and the provided pricing calculator Excel sheet with BladeBridge.
  2. The BladeBridge team will review the information and provide you with the required license keys to run the Converter.

The license key is tied to the file hash of the SQL files you are converting. If you make updates to the source SQL files, you need to purchase new license keys to convert the modified code. Therefore, make sure to purchase the necessary license keys and manage your files with a version control system to have smooth transitions when converting your BigQuery SQL code to Amazon Redshift.

Using BladeBridge Converter, convert SQL files

The Converter uses the predefined conversion rules that are available in the out-of-the-box configuration files to automatically translate 70–95% of the legacy code, significantly reducing manual effort for your development team. The out-of-the-box configuration file handles conversion for common code patterns from Google BigQuery to Amazon Redshift. For those custom patterns that aren’t covered by an out-of-the-box configuration file, you can create custom conversion rules by creating additional configuration files.

Follow these steps to run the BladeBridge Converter:

  1. Log in to the BladeBridge portal and on the Convertor downloads page, download the Convertor executable file for your operating system (sqlconv.exe for Windows or sqlconv.gz for Mac or Linux)

  1. From the same page, download the configuration file (general_sql_specs.json)
  2. Create an output directory where the converted files will be saved
  3. In the folder where you downloaded the Convertor executable sqlconv, run the following command

Syntax

sqlconv

-c <<converter license file name obtained from BladeBridge>>

-d <<input folder for SQL files>>

-n <<output folder for converted files>>

-u <<path for the config files/s provide at least one file>>

Example

sqlconv

-c converter_license.txt

-d C:\Users\XXXXX\Desktop\BladeBridge\SplitFilesUtility\splitFiles

-n C:\Users\XXXXX\Desktop\BladeBridge\SplitFilesUtility\cnvrtdFiles

-u bq2redshift.json
  1. Run the generated SQL files in your Amazon Redshift data warehouse. If you encounter errors, analyze them and determine if custom conversion rules, not already covered in the out-of-the-box configuration files, need to be applied.
  2. If custom conversion rules are needed, create a new configuration file following the guidelines in the Customize Configuration File section. Provide the new config file name in the -u option and rerun the Converter.
  3. Repeat these steps until all files are converted successfully or manually modified.

Customize configuration file

Customizing a configuration file is an iterative process that can help automate the conversion for occurrences in your codebase. However, manual conversion may be required if the conversion is needed for only a few files and a few occurrences.

The configuration is defined in a JSON file. There is a general configuration file with common rules and custom configuration files for each client with client-specific rules. Rules can be added to the general configuration file if they are applicable for all clients. For client-specific rules, a separate JSON file should be created and referenced. This keeps the general rules clean and organized.

The conversion rules in BladeBridge’s configuration file fall into one of three categories:

  1. Line substitution
  2. Block substitution
  3. Function substitution

Every line ending with a ; is a statement. This line ending also can be replaced with other breakers. Refer to this BladeBridge documentation to get more details on SQL and expression conversion.

The following are considerations while using the customized configuration:

  • Nested functions in BigQuery allow for complex operations within a single SQL statement, which may need to be broken down into multiple steps in Amazon Redshift
  • Array functions in BigQuery provide capabilities for manipulating and transforming array data types, which may require alternative approaches in Amazon Redshift
  • You need to carefully analyze the requirements and implement workarounds or alternative solutions when migrating from BigQuery to Amazon Redshift, especially for advanced functionality not directly supported in Amazon Redshift

Line substitution

Line substitution applies regular expressions to each line of code. This has the from clause, which has the expression to be converted. The to section has the target mapping for which it’ll be converted. Statement_categories limit the application of line substitution to specific statements such as DDL or procedure. For example:

  • The first expression in the following code example replaces the regular expression pattern ROWNUM with the SQL expression row_number() over (order by 1)
  • The second expression in the following code example replaces the regular expression pattern SYSDATE with the SQL expression CURRENT_TIMESTAMP.
line_subst” : [
{“from” : “\bROWNUM\b”, “to” : “row_number() over (order by 1)”},
{“from” : “SYSDATE”, “to” : “CURRENT_TIMESTAMP”}
]

Block substitution

Block substitution applies regular expressions across multiple lines. This applies to statements that stretch over multiple lines, which are generally more complex than the line substitutions. The following expression in the example replaces the block. In this example, the procedure is created in the target database.

BEGIN

EXECUTE IMMEDIATE(‘SQL Statement’);

EXCEPTION WHEN OTHERS

THEN

NULL

END;

To

CALL SP_DYN_SQL(‘parameters’);

“block_subst” : [
{“from”: “BEGIN(.*?)execute immediate(.*);.*exception\s*when\s*others\s*then\s*null(.*?)end;(.*)”, “to”: “CALL sp_dyn_sql($2);”}
]

Function substitution

Function substitution allows replacing one function with an equivalent function in the target data warehouse. The configuration also allows for specifying custom functions.

Function substitution points to an array of instructions responsible for altering function calls. This section is used when function translations are required or function arguments (function signature) have to be altered. The following expression converts the NVL2 function to CASE function on Amazon Redshift.

“function_subst” : [
{“from”: “NVL2”, “output_template” : “CASE WHEN $1 IS NOT NULL THEN $2 ELSE $3 END”}
]

Conclusion

In this post, we demonstrated how to use the BladeBridge Analyzer and BladeBridge Converter to streamline the migration of SQL code from Google BigQuery to Amazon Redshift. By using BladeBridge, organizations can significantly reduce the time and effort required to translate their existing BigQuery code for migration to the Amazon Redshift data warehouse. The Analyzer provides a detailed assessment of the source SQL code, and the Converter automates the actual conversion process using a set of predefined, customizable rules and patterns.

We also covered the customization capabilities of the BladeBridge solution, showcasing how you can tailor the conversion rules to handle more complex transformations. By using the line substitution, block substitution, and function substitution features, you can have a seamless migration that addresses the unique requirements of your data analytics infrastructure.

We encourage you to try out BladeBridge’s GCP BigQuery to Amazon Redshift solution and explore the various configuration options. If you encounter any challenges or have additional requirements, refer to the BladeBridge community support portal or reach out to the BladeBridge team for further assistance.


About the authors

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Anusha Challa is a Senior Analytics Specialist Solutions Architect focused on Amazon Redshift. She has helped many customers build large-scale data warehouse solutions in the cloud and on premises. She is passionate about data analytics and data science.

Yota Hamaoka is an Analytics Solution Architect at Amazon Web Services. He is focused on driving customers to accelerate their analytics journey with Amazon Redshift.

Milind Oke is a Data Warehouse Specialist Solutions Architect based out of New York. He has been building data warehouse solutions for over 15 years and specializes in Amazon Redshift.

Raza Hafeez is a Senior Product Manager at Amazon Redshift. He has over 13 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Let’s Architect! Modern data architectures

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-modern-data-architectures-2/

Data is the fuel for AI; modern data is even more important for generative AI and advanced data analytics, producing more accurate, relevant, and impactful results. Modern data comes in various forms: real-time, unstructured, or user-generated. Each form requires a different solution. AWS’s data journey began with Amazon Simple Storage Service (Amazon S3) in 2006, marking the start of cloud-based data storage at scale. Since then, AWS has expanded its data offerings to cover the entire data lifecycle, offering a comprehensive ecosystem of services designed to harness the full potential of modern data, from ingestion and storage to processing and analysis, supporting the entire lifecycle of AI-driven innovation.

In this blog post, we will cover some AWS use cases for modern data architectures, showing how AWS enables organizations to leverage the power of data and generative AI technologies.

Key considerations when choosing a database for your generative AI applications

This blog focuses on selecting the right database for generative AI applications and provide knowledge that can enhance your understanding, guide your decision making, and ultimately lead to more successful AI projects. Selecting the right database for generative AI applications is not just about storage; it significantly impacts performance, scalability, ease of integration, and overall effectiveness of the AI solution.

Diagram that shows the key steps in a RAG workflow

Figure 1. Diagram that shows the key steps in a RAG workflow

Take me to this blog

Strategies for building a data mesh-based enterprise solution on AWS

Adopting a data mesh architecture can enhance an organization’s ability to manage data effectively, leading to improved performance, innovation, and overall business success. In this guidance, you will discover some strategies to build data mesh solutions on AWS.

Screenshot showing the AWS Prescriptive Guidance data mesh strategies page

Figure 2. The data mesh organizes data into domains, where data are seen as quality products to expose for consumption

Take me to this guidance

Optimizing storage price and performance with Amazon S3

Amazon S3 is an object storage service that supports multiple use cases, including data architectures. Big data pipelines can use Amazon S3 to store input, output, and intermediate results. Machine learning systems use Amazon S3 to process application logs and build the datasets both for experimentation and for production model training. Given the importance of the service and the number of use cases that a foundational storage service can support, we want to share best practices, performance optimization, and cost optimization strategies to work with Amazon S3. This video shows how Anthropic designs its architecture around Amazon S3 in their data architecture.

Storage class comparison chart showing classes of Amazon S3 options

Figure 3. Workloads with predictable patterns often have low retrieval rates for long periods of time after, so we can design to adopt cheaper storage classes for them

Take me to this video

If you are curious about the underlying architecture of Amazon S3 and want to drill down into its internal design, you can watch the re:Invent video Dive deep on Amazon S3.

How HPE Aruba Supply Chain optimized cost and performance by migrating to an AWS modern data architecture

This is an AWS case study on how HPE Aruba Supply Chain successfully re-architected and deployed their data solution by adopting a modern data architecture on AWS. The new solution has helped Aruba integrate data from multiple sources, along with optimizing their cost, performance, and scalability. This has also allowed the Aruba Supply Chain leadership to receive in-depth and timely insights for better decision-making, thereby elevating the customer experience.

Reference architecture diagram showing HPE Aruba Supply Chain's architecture, featuring Amazon S3

Figure 4. Reference architecture diagram showing HPE Aruba Supply Chain’s architecture, featuring Amazon S3

Take me to this blog

AWS Modern Data Architecture Immersion Day

This workshop highlights advantage of adopting a modern data architecture on AWS. By integrating the flexibility of a data lake with specialized analytics services, organizations can significantly enhance their data-driven decision-making capabilities. We encourage everyone to explore how this architecture can streamline their analytics processes and support diverse use cases, from real-time insights to advanced machine learning. It’s an excellent opportunity to leverage modern data architecture.

Diagram showing AWS services in a flywheel

Figure 5. Data architectures are fundamental to power use cases ranging from analytics to machine learning

Take me to this workshop

See you next time!

Thanks for reading! In the next blog, we will cover some tips on how to get the best out of your developer experience on AWS. To revisit any of our previous posts or explore the entire series, visit the Let’s Architect! page.

Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg

Post Syndicated from Shaheer Mansoor original https://aws.amazon.com/blogs/big-data/modernize-your-legacy-databases-with-aws-data-lakes-part-2-build-a-data-lake-using-aws-dms-data-on-apache-iceberg/

This is part two of a three-part series where we show how to build a data lake on AWS using a modern data architecture. This post shows how to load data from a legacy database (SQL Server) into a transactional data lake (Apache Iceberg) using AWS Glue. We show how to build data pipelines using AWS Glue jobs, optimize them for both cost and performance, and implement schema evolution to automate manual tasks. To review the first part of the series, where we load SQL Server data into Amazon Simple Storage Service (Amazon S3) using AWS Database Migration Service (AWS DMS), see Modernize your legacy databases with AWS data lakes, Part 1: Migrate SQL Server using AWS DMS.

Solution overview

In this post, we go over the process of building a data lake, providing the rationale behind the different decisions, and share best practices when building such a solution.

The following diagram illustrates the different layers of the data lake.

Overall Architecture

To load data into the data lake, AWS Step Functions can define a workflow, Amazon Simple Queue Service (Amazon SQS) can track the order of incoming files, and AWS Glue jobs and the Data Catalog can be used create the data lake silver layer. AWS DMS produces files and writes these files to the bronze bucket (as we explained in Part 1).

We can turn on Amazon S3 notifications and push the new arriving file names to an SQS first-in-first-out (FIFO) queue. A Step Functions state machine can consume messages from this queue to process the files in the order they arrive.

For processing the files, we need to create two types of AWS Glue jobs:

  • Full load – This job loads the entire table data dump into an Iceberg table. Data types from the source are mapped to an Iceberg data type. After the data is loaded, the job updates the Data Catalog with the table schemas.
  • CDC – This job loads the change data capture (CDC) files into the respective Iceberg tables. The AWS Glue job implements the schema evolution feature of Iceberg to handle schema changes such as addition or deletion of columns.

As in Part 1, the AWS DMS jobs will place the full load and CDC data from the source database (SQL Server) in the raw S3 bucket. Now we process this data using AWS Glue and save it to the silver bucket in Iceberg format. AWS Glue has a plugin for Iceberg; for details, see Using the Iceberg framework in AWS Glue.

Along with moving data from the bronze to the silver bucket, we also create and update the Data Catalog for further processing the data for the gold bucket.

The following diagram illustrates how the full load and CDC jobs are defined inside the Step Functions workflow.

Step Functions for loading data into the lake

In this post, we discuss the AWS Glue jobs for defining the workflow. We recommend using AWS Step Functions Workflow Studio, and setting up Amazon S3 event notifications and an SNS FIFO queue to receive the filename as messages.

Prerequisites

To follow the solution, you need the following prerequisites set up as well as certain access rights and AWS Identity and Access Management (IAM) privileges:

  • An IAM role to run Glue jobs
  • IAM privileges to create AWS DMS resources (this role was created in Part 1 of this series; you can use the same role here)
  • The AWS DMS job from Part 1 working and producing files for the source database on Amazon S3.

Create an AWS Glue connection for the source database

We need to create a connection between AWS Glue and the source SQL Server database so the AWS Glue job can query the source for the latest schema while loading the data files. To create the connection, follow these steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Choose Create custom connector.
  3. Give the connection a name and choose JDBC as the connection type.
  4. In the JDBC URL section, enter the following string and replace the name of your source database endpoint and database that was set up in Part 1: jdbc:sqlserver://{Your RDS End Point Name}:1433/{Your Database Name}.
  5. Select Require SSL connection, then choose Create connector.

Clue Connections

Create and configure the full load AWS Glue job

Complete the following steps to create the full load job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Script editor and select Spark.
  3. Choose Start fresh and select Create script.
  4. Enter a name for the full load job and choose the IAM role (mentioned in the prerequisites) for running the job.
  5. Finish creating the job.
  6. On the Job details tab, expand Advanced properties.
  7. In the Connections section, add the connection you created.
  8. Under Job parameters, pass the following arguments to the job:
    1. target_s3_bucket – The silver S3 bucket name.
    2. source_s3_bucket – The raw S3 bucket name.
    3. secret_id – The ID of the AWS Secrets Manager secret for the source database credentials.
    4. dbname – The source database name.
    5. datalake-formats – This sets the data format to iceberg.

Glue Job Parameters

The full load AWS Glue job starts after the AWS DMS task reaches 100%. The job loops over the files located in the raw S3 bucket and processes them one at time. For each file, the job infers the table name from the file name and gets the source table schema, including column names and primary keys.

If the table has one or more primary keys, the job creates an equivalent Iceberg table. If the job has no primary key, the file is not processed. In our use case, all the tables have primary keys, so we enforce this check. Depending on your data, you might need to handle this scenario differently.

You can use the following code to process the full load files. To start the job, choose Run.

import sys, boto3, json
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

#Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket'])
dbname = "AdventureWorks"
schema = "HumanResources"

#Initialize parameters
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns

#Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

#Helper Function: Load Iceberg table with Primary key(s)
def load_table(full_load_data_df, dbname, table_name):

    try:
        full_load_data_df = full_load_data_df.drop(*drop_column_list)
        full_load_data_df.createOrReplaceTempView('full_data')

        query = """
        CREATE TABLE IF NOT EXISTS glue_catalog.{0}.{1}
        USING iceberg
        LOCATION "s3://{2}/{0}/{1}"
        AS SELECT * FROM full_data
        """.format(dbname, table_name, target_s3_bucket)
        spark.sql(query)
        
        #Update Table property to accept Schema Changes
        spark.sql("""ALTER TABLE glue_catalog.{0}.{1} SET TBLPROPERTIES (
                      'write.spark.accept-any-schema'='true'
                    )""".format(dbname, table_name))
        
    except Exception as ex:
        print(ex)
        failed_table = {"table_name": table_name, "Reason": ex}
        unprocessed_tables.append(failed_table)
        
def get_table_key(host, port, username, password, dbname):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    df_table_pkeys = spark.sql("select c.TABLE_NAME, C.COLUMN_NAME as primary_key FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY'")
    return df_table_pkeys


#Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(dbname))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)


#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Initialize primary keys for all tables
df_table_pkeys = get_table_key(host, port, username, password, dbname)

#Read Full load csv files from s3
s3 = boto3.client('s3')
full_load_tables = s3.list_objects_v2(Bucket=source_s3_bucket, Prefix="raw/{0}/{1}".format(args['dbname'], args['schema']))

#Loop over files
for item in full_load_tables['Contents']:
    pkey_list = []
    table_name = item["Key"].split("/")[3].lower()
    print("Table name {0}".format(table_name))
    current_table_df = df_table_pkeys.where(df_table_pkeys.TABLE_NAME == table_name)

    # Only Process tables with at least 1 Primary key
    if not current_table_df.isEmpty():
        for i in current_table_df.collect():
            pkey_list.append(i["primary_key"])
    else:
        failed_table = {"table_name": table_name, "Reason": "No primary key"}
        unprocessed_tables.append(failed_table)
        # ToDo Handle these cases

    full_data_path = "s3://{0}/{1}".format(source_s3_bucket, item['Key'])
    full_load_data_df = (spark
                        .read
                        .option("header", True)
                        .option("inferSchema", True)
                        .option("recursiveFileLookup", "true")
                        .csv(full_data_path)
                        )

    primary_key = ",".join(pkey_list)

    if table_name not in unprocessed_tables:
        load_table(full_load_data_df, dbname, table_name)

When the job is complete, it creates the database and tables in the Data Catalog, as shown in the following screenshot.

Data lake silver layer data

Create and configure the CDC AWS Glue job

The CDC AWS Glue job is created similar to the full load job. As with the full load AWS Glue job, you need to use the source database connection and pass the job parameters with one additional parameter, cdc_file, which contains the location of the CDC file to be processed. Because a CDC file can contain data for multiple tables, the job loops over the tables in a file and loads the table metadata from the source table ( RDS column names).

If the CDC operation is DELETE, the job deletes the records from the Iceberg table. If the CDC operation is INSERT or UPDATE, the job merges the data into the Iceberg table.

You can use the following code to process the CDC files. To start the job, choose Run

import sys
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket',
                           'cdc_file'])
dbname = "AdventureWorks"
schema = "HumanResources"
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
cdc_file = args['cdc_file']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns
source_s3_cdc_file_key = "raw/AdventureWorks/cdc/" + cdc_file



# Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

# Helper Function: Column names from RDS
def get_table_colums(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    columns = list((row.COLUMN_NAME) for (index, row) in spark.sql("select TABLE_NAME, TABLE_CATALOG, COLUMN_NAME from TABLE_COLUMNS where TABLE_NAME = '{0}' and TABLE_CATALOG = '{1}'".format(table, dbname)).select("COLUMN_NAME").toPandas().iterrows())
    return columns

# Helper Function: Get Colum names and datatypes from RDS
def get_table_colum_datatypes(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    return spark.sql("select TABLE_NAME, COLUMN_NAME, DATA_TYPE from TABLE_COLUMNS WHERE TABLE_NAME ='{0}'".format(table))

# Helper Function: Setup the primary key condition
def get_iceberg_table_condition(database, tablename):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, database)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    
    condition = ''
    
    for key in spark.sql("select C.COLUMN_NAME FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY' AND c.TABLE_NAME = '{0}'".format(table)).collect():
        condition += "target.{0} = source.{0} and".format(key.COLUMN_NAME)
    return condition[:-4]

    
# Read incoming data from Amazon S3
def read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key):
    
    inputDf = (spark
                    .read
                    .option("header", False)
                    .option("inferSchema", True)
                    .option("recursiveFileLookup", "true")
                    .csv("s3://" + source_s3_bucket + "/" + source_s3_cdc_file_key)
                    )
    return inputDf

# Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(target_s3_bucket))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)

#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Read the cdc file 
cdc_df = read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key)

tables = cdc_df.toPandas()._c1.unique().tolist()

#Loop over tables in the cdc file
for table in tables:
    #Create dataframes for delets and for inserts and updates
    table_df_deletes = cdc_df.where((cdc_df._c1 == table) & (cdc_df._c0 == "D")).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    table_df_upserts = cdc_df.where((cdc_df._c1 == table) & ((cdc_df._c0 == "I") | (cdc_df._c0 == "U"))).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    
    #Update column names for the dataframes
    columns = get_table_colums(table, host, port, username, password, dbname) 
    selectExpr = [] 

    for column in columns: 
        selectExpr.append(cdc_df.where((cdc_df._c1 == table)).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3]).columns[columns.index(column)] + " as " + column)

    table_df_deletes = table_df_deletes.selectExpr(selectExpr) 
    table_df_upserts = table_df_upserts.selectExpr(selectExpr)
    
    #Process Deletes
    if table_df_deletes.count() > 0:
        
        print("Delete Triggered")
        table_df_deletes.createOrReplaceTempView('deleted_rows')
        
        sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                        USING (SELECT * FROM deleted_rows) source
                        ON {2}
                        WHEN MATCHED 
                        THEN DELETE""".format(database, table.lower(), get_iceberg_table_condition(database, table.lower()))
        spark.sql(sql_string)
    
    if table_df_upserts.count() > 0:
        print("Upsert triggered")

        #Upsert Records when there are Schema Changes
        if len(table_df_upserts.columns) != len(columns):

            #Handle column deletes
            if len(table_df_upserts.columns) < len(columns):

                drop_columns = list(set(columns) - set(table_df_upserts.columns))

                for drop_column in drop_columns:
                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    DROP COLUMN {2}""".format(dbname.lower(), table.lower(), drop_column)
                    spark.sql(sql_string)

            #Handle column additions
            elif len(table_df_upserts.columns) > len(columns):

                column_datatype_df = get_table_colum_datatypes(table, host, port, username, password, dbname)
                add_columns = list(set(table_df_upserts.columns) - set(columns))

                for add_column in add_columns:

                    #Set Iceberg data type
                    data_type = list((row.DATA_TYPE) for (index, row) in column_datatype_df.filter("COLUMN_NAME='{0}'".format(add_column)).select("DATA_TYPE").toPandas().iterrows())[0]

                    # Convert MSSQL Datatypes to Iceberg supported datatypes
                    if data_type.lower() in ["varchar", "char"]:
                        data_type = "string"

                    if data_type.lower() in ["bigint"]:
                        data_type = "long"

                    if data_type.lower() in ["array"]:
                        data_type = "list"

                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    ADD COLUMN {2} {3}""".format(dbname.lower(), table.lower(), add_column, data_type)
                    spark.sql(sql_string)
                    
            #Create statement to update columns
            update_table_column_list = ""
            insert_column_list = ""
            columns = get_table_colums(table, host, port, username, password, dbname)             

            for column in columns:

                update_table_column_list+="""target.{0}=source.{0},""".format(column)
                insert_column_list+="""source.{0},""".format(column)

            table_df_upserts.createOrReplaceTempView('updated_rows')

            sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                            USING (SELECT * FROM updated_rows) source
                            ON {2}
                            WHEN MATCHED 
                            THEN UPDATE SET {3} 
                            WHEN NOT MATCHED THEN INSERT ({4}) VALUES ({5})""".format(dbname.lower(), 
                                                                                      table.lower(), 
                                                                                      get_iceberg_table_condition(dbname.lower(), table.lower()), 
                                                                                      update_table_column_list.rstrip(","), 
                                                                                      ",".join(columns), 
                                                                                      insert_column_list.rstrip(","))

            spark.sql(sql_string)

    
print("CDC job complete")

The Iceberg MERGE INTO syntax can handle cases where a new column is added. For more details on this feature, see the Iceberg MERGE INTO syntax documentation. If the CDC job needs to process many tables in the CDC file, the job can be multi-threaded to process the file in parallel.

 

Configure EventBridge notifications, SQS queue, and Step Functions state machine

You can use EventBridge notifications to send notifications to EventBridge when certain events occur on S3 buckets, such as when new objects are created and deleted. For this post, we’re interested in the events when new CDC files from AWS DMS arrive in the bronze S3 bucket. You can create event notifications for new objects and insert the file names into an SQS queue. A Lambda function within Step Functions would consume from the queue, extract the file name, start a CDC Glue job, and pass the file name as a parameter to the job.

AWS DMS CDC files contain database insert, update, and delete statements. We need to process these in order, so we use an SQS FIFO queue, which preserves the order of messages in which they arrive. You can also configure Amazon SQS to set a time to live (TTL); this parameter defines how long a message stays in the queue before it expires.

Another important parameter to consider when configuring an SQS queue is the message visibility timeout value. While a message is being processed, it disappears from the queue to make sure that the message isn’t consumed by multiple consumers (AWS Glue jobs in our case). If the message is consumed successfully, it should be deleted from the queue before the visibility timeout. However, if the visibility timeout expires and the message isn’t deleted, the message reappears in the queue. In our solution, this timeout must be greater than the time it takes for the CDC job to process a file.

Lastly, we recommend using Step Functions to define a workflow for handling the full load and CDC files. Step Functions has built-in integrations to other AWS services like Amazon SQS, AWS Glue, and Lambda, which makes it a good candidate for this use case.

The Step Functions state machine starts with checking the status of the AWS DMS task. The AWS DMS tasks can be queried to check the status of the full load, and we check the value of the parameter FullLoadProgressPercent. When this value gets to 100%, we can start processing the full load files. After the AWS Glue job processes the full load files, we start polling the SQS queue to check the size of the queue. If the queue size is greater than 0, this means new CDC files have arrived and we can start the AWS Glue CDC job to process these files. The AWS Glue jobs processes the CDC files and deletes the messages from the queue. When the queue size reaches 0, the AWS Glue job exits and we loop in the Step Functions workflow to check the SQS queue size.

Because the Step Functions state machine is supposed to run indefinitely, it’s good to keep in mind that there will be service limits you need to adhere to. Namely, the maximum runtime, which is 1 year, and maximum run history size, i.e., state transitions or events for a state machine which is 25,000. We recommend adding an additional step at the end to check if either of these conditions are being met to stop the current state machine run and start a new one.

The following diagram illustrates how you can use Step Functions state machine history size to monitor and start a new Step Functions state machine run.

Step Functions Workflow

Configure the pipeline

The pipeline needs to be configured to address cost, performance, and resilience goals. You might want a pipeline that can load fresh data into the data lake and make it available quickly, and you might also want to optimize costs by loading large chunks of data into the data lake. At the same time, you should make the pipeline resilient and be able to recover in case of failures. In this section, we cover the different parameters and recommended settings to achieve these goals.

Step Functions is designed to process incoming AWS DMS CDC files by running AWS Glue jobs. AWS Glue jobs can take a couple of minutes to boot up, and when they’re running, it’s efficient to process large chunks of data. You can configure AWS DMS to write CSV files to Amazon S3 by configuring the following AWS DMS task parameters:

  • CdcMaxBatchInterval – Defines the maximum time limit AWS DMS will wait before writing a batch to Amazon S3
  • CdcMinFileSize – Defines the minimum file size AWS DMS will write to Amazon S3

Whichever condition is met first will invoke the write operation. If you want to prioritize data freshness, you should have a short CdcMaxBatchInterval value (10 seconds) and a small CdcMinFileSize value (1–5 MB). This will result in many small CSV files being written to Amazon S3 and will invoke a lot of AWS Glue jobs to process the data, making the extract, transform, and load (ETL) process faster. If you want to optimize costs, you should have a moderate CdcMaxBatchInterval (minutes) and a large CdcMinFileSize value (100–500 MB). In this scenario, we start a few AWS Glue jobs that will process large chunks of data, making the ETL flow more efficient. In a real-world use case, the required values for these parameters might fall somewhere that’s a good compromise between throughput and cost. You can configure these parameters when creating a target endpoint using the AWS DMS console, or by using the create-endpoint command in the AWS Command Line Interface (AWS CLI).

For the full list of parameters, see Using Amazon S3 as a target for AWS Database Migration Service.

Choosing the right AWS Glue worker types for the full load and CDC jobs is also crucial for performance and cost optimization. The AWS Glue (Spark) workers range from G1X to G8X, which have an increasing number of data processing units (DPUs). Full load files are usually much larger in size compared to CDC files, and therefore it’s more cost- and performance-effective to select a larger worker. For CDC files, it would be more cost-effective to select a smaller worker because files sizes are smaller.

You should design the Step Functions state machine in such a way that if anything fails, the pipeline can be redeployed after repair and resume processing from where it left off. One important parameter here is TTL for the messages in the SQS queue. This parameter defines how long a message stays in the queue before expiring. In case of failures, we want this parameter to be long enough for us to deploy a fix. Amazon SQS has a maximum of 14 days for a message’s TTL. We recommend setting this to a large enough value to minimize messages being expired in case of pipeline failures.

Clean up

Complete the following steps to clean up the resources you created in this post:

  1. Delete the AWS Glue jobs:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. Select the full load and CDC jobs and on the Actions menu, choose Delete.
    3. Choose Delete to confirm.
  2. Delete the Iceberg tables:
    1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Databases.
    2. Choose the database in which the Iceberg tables reside.
    3. Select the tables to delete, choose Delete, and confirm the deletion.
  3. Delete the S3 bucket:
    1. On the Amazon S3 console, choose Buckets in the navigation pane.
    2. Choose the silver bucket and empty the files in the bucket.
    3. Delete the bucket.

Conclusion

In this post, we showed how to use AWS Glue jobs to load AWS DMS files into a transactional data lake framework such as Iceberg. In our setup, AWS Glue provided highly scalable and simple-to-maintain ETL jobs. Furthermore, we share a proposed solution using Step Functions to create an ETL pipeline workflow, with Amazon S3 notifications and an SQS queue to capture newly arriving files. We shared how to design this system to be resilient towards failures and to automate one of the most time-consuming tasks in maintaining a data lake: schema evolution.

In Part 3, we will share how to process the data lake to create data marts.


About the Authors

Shaheer Mansoor is a Senior Machine Learning Engineer at AWS, where he specializes in developing cutting-edge machine learning platforms. His expertise lies in creating scalable infrastructure to support advanced AI solutions. His focus areas are MLOps, feature stores, data lakes, model hosting, and generative AI.

Anoop Kumar K M is a Data Architect at AWS with focus in the data and analytics area. He helps customers in building scalable data platforms and in their enterprise data strategy. His areas of interest are data platforms, data analytics, security, file systems and operating systems. Anoop loves to travel and enjoys reading books in the crime fiction and financial domains.

Sreenivas Nettem is a Lead Database Consultant at AWS Professional Services. He has experience working with Microsoft technologies with a specialization in SQL Server. He works closely with customers to help migrate and modernize their databases to AWS.

Simplify your query performance diagnostics in Amazon Redshift with Query profiler

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/simplify-your-query-performance-diagnostics-in-amazon-redshift-with-query-profiler/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that lets you analyze your data at scale. Amazon Redshift Serverless lets you access and analyze data without the usual configurations of a provisioned data warehouse. Resources are automatically provisioned and data warehouse capacity is intelligently scaled to deliver fast performance for even the most demanding and unpredictable workloads. If you prefer to manage your Amazon Redshift resources manually, you can create provisioned clusters for your data querying needs. For more information, refer to Amazon Redshift clusters.

Amazon Redshift provides performance metrics and data so you can track the health and performance of your provisioned clusters, serverless workgroups, and databases. The performance data you can use on the Amazon Redshift console falls into two categories:

  • Amazon CloudWatch metrics – Helps you monitor the physical aspects of your cluster or serverless, such as resource utilization, latency, and throughput.
  • Query and load performance data – Helps you monitor database activity, inspect and diagnose query performance problems.

Amazon Redshift has introduced a new feature called the Query profiler. The Query profiler is a graphical tool that helps users analyze the components and performance of a query. This feature is part of the Amazon Redshift console and provides a visual and graphical representation of the query’s run order, execution plan, and various statistics. The Query profiler makes it easier for users to understand and troubleshoot their queries.

In this post, we cover two common use cases for troubleshooting query performance. We show you step-by-step how to analyze and troubleshoot long-running queries using the Query profiler.

Overview

For Amazon Redshift Serverless, the Query profiler can be accessed by going to the Serverless console. Choose Query and database monitoring, select a query, and then navigate to the Query plan tab. If a query plan is available, you will observe a list of child queries. Choose a query to view it in Query profiler.

For Amazon Redshift provisioned, the Query profiler can be accessed by going to the provisioned clusters dashboard. Choose Query and loads, and choose a query. Navigate to the Query plan tab. If a query plan is available, you will observe a list of child queries. Choose a query to view it in Query profiler.

Prerequisites

  • You can use the following sample AWS Identity and Access Management (IAM) policy to configure your IAM user or role with minimum privileges to access Query profiler from the AWS console. If your IAM user or role already has access to Query and loads section of Redshift provisioned cluster dashboard or Query and database monitoring section of Redshift serverless dashboard, then no additional permissions are needed:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "redshift:DescribeClusters",
                "redshift-serverless:ListNamespaces",
                "redshift-serverless:ListWorkgroups",
                "redshift-data:ExecuteStatement",
                "redshift-data:DescribeStatement",
                "redshift-data:GetStatementResult"
            ],
            "Resource": [
                "arn:aws:redshift-serverless:<your-namespace>",
                "arn:aws:redshift-serverless:<your-workgroupname>",
                "arn:aws:redshift:<your-clustername>"
            ]
        }
    ]
}
  • You can choose to use Query profiler in your account with an existing Amazon Redshift data warehouse and queries. However, if you would like to implement this demo in your existing Amazon Redshift data warehouse, download Redshift query editor v2 notebook, Redshift Query profiler demo, and refer to the Data Loading section later in this post.
  • You must connect to the cluster using database credentials and grant the sys:operator or sys:monitor role to the database user to view queries run by users.

Data loading

Amazon Redshift Query Editor v2 comes with sample data that can be loaded into a sample database and corresponding schema. To test Query profiler against the sample data, load the tpcds sample data and run queries.

  1. To load the tpcds sample data, launch Redshift query editor v2 and expand the database sample_data_dev.
  2. Choose the icon associated with the tpcds.
  3. The query editor v2 then loads the data into a schema tpcds in the database sample_data_dev.

The following screenshot shows these steps.
Load Data

  1. Verify the data by running the following sample query, as shown in the following screenshot.
select count(*) from sample_data_dev.tpcds.customer;

Verify Data

Use cases

In this post, we describe two common uses cases around query performance and how to use Query profiler to troubleshoot the performance issues:

  1. Nested loop joins – This join type is the slowest of the possible join types. Nested loop joins are the cross-joins without a join condition that result in the Cartesian product of two tables.
  2. Suboptimal data distribution – If data distribution is suboptimal, you might notice a large broadcast or redistribution of data across compute nodes when two large tables are joined together.

Use case 1: Nested loop joins

To troubleshoot performance issues with nest loop joins using Query profiler, follow these steps:

  1. Import notebook downloaded previously in prerequisites section of the blog into Redshift query editor v2.
  2. Set the context of database to sample_data_dev in Query Editor v2, as shown in the following screenshot.
    Set the database context
  3. Run cell #3 from demo notebook to diagnose a query performance issue related to nested loop joins.
    Step 3

The query takes around 12 seconds to run, as shown in the Query Editor v2 results panel in the following screenshot.

Step 4 results

  1. Run cell #5 to capture the query id from the SYS_QUERY_HISTORY system view filtering based on the query label you set in the preceding step.Cell 5
  2. On the Amazon Redshift console, in the navigation pane, select Query and loads and choose the cluster name where the query was originally executed, as shown in the following screenshot.
    Query and loads
  3. This will open the new Query profiler. Under the Query history section, choose Connect to database.After successful connection to the database, you will observe the Status showing as Connected and displaying the query history, as shown in the following screenshot.
    Connec to database
  4. You can find your queries either by Query ID or Process ID. Enter the Query ID captured in the preceding step to filter the long-running query for further analysis and choose the corresponding Query ID, as shown in the following screenshot.
    Search query
  5. Under the Query plan section, choose Child query 1, as shown in the following screenshot. If there are multiple child queries, you will have to inspect each one for performance issues.
    Child queryThis will open the query plan in a tree view along with additional metrics on the side panel. This allows you to quickly analyze the query streams, segments and steps. For more information about streams, segments, and steps, refer to Query planning and execution workflow in the Amazon Redshift Database Developer Guide.
  6. Turn on View streams and, in the Streams side panel, investigate and identify which stream has the highest execution time. In this case, Streams ID 5 is where the query spends the majority of time, as shown in the following screenshot
    Enable view stream
  7. In the Streams side panel, under ID, select 5 to focus on Stream 5 for further analysis. Stream 5 shows a step of Nestloop, as shown in the following screenshot.
    Nestloop step
  8. Choose the Nestloop step to further analyze. The side panel will change with step details and additional metrics about the nested loop join.
  9. By looking at Step details – nestloop, we can inspect the Input rows and compare that with the Output rows, as shown in the following screenshot. In this case, due to the cross-joining with the Store_returns table, 287,514 input rows explodes to 950,233,770 rows, thus causing our query to run slower.
    Nestloop step details
  10. Fix the query by introducing a join condition between the store_sales and store_returns. Run cell #7 from Query editor v2 demo notebook.The re-written query runs in just 307 milliseconds.Cell 7

Use case 2: Suboptimal data distribution

  1. To demonstrate suboptimal data distribution, change the distribution style of tables web_sales and web_returns to EVEN by running cell #10 of Query editor v2 demo notebook.Cell 10
  1. Run cell #12. The query takes 409 milliseconds to run, as shown by the elapsed time in the following screenshot of the Query editor v2.Cell 12
  2. Follow steps 3–10 from use case 1 to locate the query_id and to open the Query profiler view for the preceding query.
  3. On the Query profiler page for the preceding query, turn on View streams. In the Streams side panel, investigate and identify which stream has the highest execution time. In this case, Stream ID 6 is where the query spends a majority of the time, as shown in the following screenshot.
    View streams
  4. Under ID, select 6 from the Streams side panel for further analysis.
    Streams side panel

Stream 6 shows a step of hash join, which involves a hash join of two tables that are both redistributed. This can be inferred from Hash Right Join DS_DIST_BOTH under Explain plan node information in the following screenshot. Usually, these redistributions occur because the tables aren’t joined on their distribution keys, or they don’t have the correct distribution style. In the case of large tables, these redistributions can lead to significant performance degradation and, hence, it is important to identify and fix such steps to optimize query performance.

Hashjoin step

  1. Fix this suboptimal data distribution pattern by choosing the appropriate distribution keys on the tables involved: web_sales and web_returns. To change the distribution styles, run cell #14 of demo notebook to alter table commands.
    Cell 14
  2. After the preceding commands finish running, run cell #16 to re-execute the select query. As shown in the Query Editor in the following screenshot, now the same query finished in 244 milliseconds after updating the distribution style to key for tables web_sales and web_returns.
    Cell 16

  3. In the Query profiler view, turn on View streams and notice that Streams 5 now took the most time. It took 8 milliseconds to finish, as compared to 13 milliseconds in the preceding step.
    View streams
  4. In the Streams side panel, under ID, select 5 to drill down further, then choose the Hashjoin As the following screenshot shows, after changing the distribution style to key for both web_sales and web_return tables, none of the tables need to be redistributed at the query runtime, resulting in optimized performance.
    Hashjoin step

Considerations

Consider the following details while using Query profiler:

  1. Query profiler displays information returned by the SYS_QUERY_HISTORY, SYS_QUERY_EXPLAIN, SYS_QUERY_DETAIL, and SYS_CHILD_QUERY_TEXT views.
  2. Query profiler only displays query information for queries that have recently run on the database. If a query completes using a prepopulated resultset cache, Query profiler won’t have information about it because Amazon Redshift doesn’t generate a query plan for such queries.
  3. Queries run by Query profiler to return the query information run on the same data warehouse as the user-defined queries.

Clean Up

To avoid unexpected costs, complete the following action to delete the resources you created:

Drop all the tables in the sample_data_dev under tpcds schema.

Conclusion

In this post, we discussed how to use Amazon Redshift Query profiler to monitor and troubleshoot long-running queries. We demonstrated a step-by-step approach to analyze query performance by examining the query execution plan and statistics and identifying the root cause of query slowness. Try this feature in your environment and share your feedback with us.


About the Authors

Raks KhareRaks Khare is a Senior Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers across varying industries and regions architect data analytics solutions at scale on the AWS platform. Outside of work, he likes exploring new travel and food destinations and spending quality time with his family.

Blessing Bamiduro is part of the Amazon Redshift Product Management team. She works with customers to help explore the use of Amazon Redshift ML in their data warehouse. In her spare time, Blessing loves travels and adventures.

Ekta Ahuja is an Amazon Redshift Specialist Solutions Architect at AWS. She is passionate about helping customers build scalable and robust data and analytics solutions. Before AWS, she worked in several different data engineering and analytics roles. Outside of work, she enjoys landscape photography, traveling, and board games.

Accelerate Amazon Redshift Data Lake queries with AWS Glue Data Catalog Column Statistics

Post Syndicated from Kalaiselvi Kamaraj original https://aws.amazon.com/blogs/big-data/accelerate-amazon-redshift-data-lake-queries-with-column-level-statistics/

Amazon Redshift enables you to efficiently query and retrieve structured and semi-structured data from open format files in Amazon S3 data lake without having to load the data into Amazon Redshift tables. Amazon Redshift extends SQL capabilities to your data lake, enabling you to run analytical queries. Amazon Redshift supports a wide variety of tabular data formats like CSV, JSON, Parquet, ORC and open tabular formats like Apache Hudi, Linux foundation Delta Lake and Apache Iceberg.

You create Redshift external tables by defining the structure for your files, S3 location of the files and registering them as tables in an external data catalog. The external data catalog can be AWS Glue Data Catalog, the data catalog that comes with Amazon Athena, or your own Apache Hive metastore.

Over the last year, Amazon Redshift added several performance optimizations for data lake queries across multiple areas of query engine such as rewrite, planning, scan execution and consuming AWS Glue Data Catalog column statistics. To get the best performance on data lake queries with Redshift, you can use AWS Glue Data Catalog’s column statistics feature to collect statistics on Data Lake tables. For Amazon Redshift Serverless instances, you will see improved scan performance through increased parallel processing of S3 files and this happens automatically based on RPUs used.

In this post, we highlight the performance improvements we observed using industry standard TPC-DS benchmarks. Overall execution time of TPC-DS 3 TB benchmark improved by 3x. Some of the queries in our benchmark experienced up to 12x speed up.

Performance Improvements

Several performance optimizations were done over the last year to improve performance of data lake queries including the following.

  • Consume AWS Glue Data Catalog column statistics and tuning of Redshift optimizer to improve quality of query plans
  • Utilize bloom filters for partition columns
  • Improved scan efficiency for Amazon Redshift Serverless instances through increased parallel processing of files
  • Novel query rewrite rules to merge similar scans
  • Faster retrieval of metadata from AWS Glue Data Catalog

To understand the performance gains, we tested the performance on the industry-standard TPC-DS benchmark using 3 TB data sets and queries which represents different customer use cases. Performance was tested on a Redshift serverless data warehouse with 128 RPU. In our testing, the dataset was stored in Amazon S3 in Parquet format and AWS Glue Data Catalog was used to manage external databases and tables. Fact tables were partitioned on the date column, and each fact table consisted of approximately 2,000 partitions. All of the tables had their row count table property, numRows, set as per the spectrum query performance guidelines.

We did a baseline run on Redshift patch version (patch 172) from last year. Later, we ran all TPC-DS queries on latest patch version (patch 180) that includes all performance optimizations added over last year. Then we used AWS Glue Data Catalog’s column statistics feature to compute statistics for all the tables and measured improvements with the presence of AWS Glue Data Catalog column statistics.

Our analysis revealed that the TPC-DS 3TB Parquet benchmark saw substantial performance gains with these optimizations. Specifically, partitioned Parquet with our latest optimizations achieved 2x faster runtimes compared to the previous implementation. Enabling AWS Glue Data Catalog column statistics further improved performance by 3x versus last year. The following graph illustrates these runtime improvements for the full benchmark (all TPC-DS queries) over the past year, including the additional boost from using AWS Glue Data Catalog column statistics.

Improvement in total runtime of TPC-DS 3T workload

Figure 1: Improvement in total runtime of TPC-DS 3T workload

The following graph presents the top queries from the TPC-DS benchmark with the greatest performance improvement over the last year with and without AWS Glue Data Catalog column statistics. You can see that performance improves a lot when statistics exist on AWS Glue Data Catalog (for details on how to get statistics for your Data Lake tables, please refer to optimizing query performance using AWS Glue Data Catalog column statistics). Specifically, multi-join queries will benefit the most from AWS Glue Data Catalog column statistics because the optimizer uses statistics to choose the right join order and distribution strategy.

Speed-up in TPC-DS queries

Figure 2: Speed-up in TPC-DS queries

Let’s discuss some of the optimizations that contributed to improved query performance.

Optimizing with table-level statistics

Amazon Redshift’s design enables it to handle large-scale data challenges with superior speed and cost-efficiency. Its massively parallel processing (MPP) query engine, AI-powered query optimizer, auto-scaling capabilities, and other advanced features allow Redshift to excel at searching, aggregating, and transforming petabytes of data.

However, even the most powerful systems can experience performance degradation if they encounter anti-patterns like grossly inaccurate table statistics, such as the row count metadata.

Without this crucial metadata, Redshift’s query optimizer may be limited in the number of possible optimizations, especially those related to data distribution during query execution. This can have a significant impact on overall query performance.

To illustrate this, consider the following simple query involving an inner join between a large table with billions of rows and a small table with only a few hundred thousand rows.

select small_table.sellerid, sum(large_table.qtysold)
from large_table, small_table
where large_table.salesid = small_table.listid
 and small_table.listtime > '2023-12-01'
 and large_table.saletime > '2023-12-01'
group by 1 order by 1

If executed as-is, with the large table on the right-hand side of the join, the query will lead to sub-optimal performance. This is because the large table will need to be distributed (broadcast) to all Redshift compute nodes to perform the inner join with the small table, as shown in the following diagram.

Inaccurate table statistics lead to limited optimizations and large amounts of data broadcast among compute nodes for a simple inner join

Figure 3: Inaccurate table statistics lead to limited optimizations and large amounts of data broadcast among compute nodes for a simple inner join

Now, consider a scenario where the table statistics, such as the row count, are accurate. This allows the Amazon Redshift query optimizer to make more informed decisions, such as determining the optimal join order. In this case, the optimizer would immediately rewrite the query to have the large table on the left-hand side of the inner join, so that it is the small table that is broadcast across the Redshift compute nodes, as illustrated in the following diagram.

Accurate table statistics lead to high degree of optimizations and very little data broadcast among compute nodes for a simple inner join

Figure 4: Accurate table statistics lead to high degree of optimizations and very little data broadcast among compute nodes for a simple inner join

Fortunately, Amazon Redshift automatically maintains accurate table statistics for local tables by running the ANALYZE command in the background. For external tables (data lake tables), however, AWS Glue Data Catalog column statistics are recommended for use with Amazon Redshift as we will discuss in the next section. For more general information on optimizing queries in Amazon Redshift, please refer to the documentation on factors affecting query performance, data redistribution, and Amazon Redshift best practices for designing queries.

Improvements with AWS Glue Data Catalog column statistics

AWS Glue Data Catalog has a feature to compute column level statistics for Amazon S3 backed external tables. AWS Glue Data Catalog can compute column level statistics such as NDV, Number of Nulls, Min/Max and Avg. column width for the columns without the need for additional data pipelines. Amazon Redshift cost-based optimizer utilizes these statistics to come up with better quality query plans. In addition to consuming statistics, we also made several improvements in cardinality estimations and cost tuning to get high quality query plans thereby improving query performance.

TPC-DS 3TB dataset showed 40% improvement in total query execution time when these AWS Glue Data Catalog column statistics were provided. Individual TPC-DS queries showed up to 5x improvements in query execution time. Some of the queries that had greater impact in execution time are Q85, Q64, Q75, Q78, Q94, Q16, Q04, Q24 and Q11.

We will go through an example where cost-based optimizer generated a better query plan with statistics and how it improved the execution time.

Let’s consider following simpler version of TPC-DS Q64 to showcase the query plan differences with statistics.

select i_product_name product_name
,i_item_sk item_sk
,ad1.ca_street_number b_street_number
,ad1.ca_street_name b_street_name
,ad1.ca_city b_city
,ad1.ca_zip b_zip
,d1.d_year as syear
,count(*) cnt
,sum(ss_wholesale_cost) s1
,sum(ss_list_price) s2
,sum(ss_coupon_amt) s3
FROM   tpcds_3t_alls3_pp_ext.store_sales
,tpcds_3t_alls3_pp_ext.store_returns
,tpcds_3t_alls3_pp_ext.date_dim d1
,tpcds_3t_alls3_pp_ext.customer
,tpcds_3t_alls3_pp_ext.customer_address ad1
,tpcds_3t_alls3_pp_ext.item
WHERE
ss_sold_date_sk = d1.d_date_sk AND
ss_customer_sk = c_customer_sk AND

ss_addr_sk = ad1.ca_address_sk and
ss_item_sk = i_item_sk and
ss_item_sk = sr_item_sk and
ss_ticket_number = sr_ticket_number and
i_color in ('firebrick','papaya','orange','cream','turquoise','deep') and
i_current_price between 42 and 42 + 10 and
i_current_price between 42 + 1 and 42 + 15
group by i_product_name
,i_item_sk
,ad1.ca_street_number
,ad1.ca_street_name
,ad1.ca_city
,ad1.ca_zip
,d1.d_year

Without Statistics

Following figure represents the logical query plan of Q64. You can observe that cardinality estimation of joins is not accurate. With inaccurate cardinalities, optimizer produces a sub-optimal query plan leading to higher execution time.

With Statistics

Following figure represents the logical query plan after consuming AWS Glue Data Catalog column statistics. Based on the highlighted changes, you can observe that the cardinality estimations of JOIN improved by many magnitudes helping the optimizer to choose a better join order and join strategy (broadcast DS_BCAST_INNER vs. distribute DS_DIST_BOTH). Switching the customer_address and customer table from inner to outer table and making join strategies as distribute has major impact because this reduces the data movement between the nodes and avoids spilling from hash table.

Logical query plan of Q64 without statistics

Figure 5: Logical query plan of Q64 without statistics

Logical query plan of Q64 after consuming column-level statistics

Figure 6: Logical query plan of Q64 after consuming AWS Glue Data Catalog column statistics

This change in query plan improved the query execution time of Q64 from 383s to 81s.

Given the greater benefits with AWS Glue Data Catalog column statistics for the optimizer, you should consider collecting stats for your data lake using AWS Glue. If your workload is a JOIN heavy workload, then collecting stats will show greater improvement on your workload. Refer to generating AWS Glue Data Catalog column statistics for instructions on how to collect statistics in AWS Glue Data Catalog.

Query rewrite optimization

We introduced a new query rewrite rule which combines scalar aggregates over the same common expression using slightly different predicates. This rewrite resulted in performance improvements on TPC-DS queries Q09, Q28, and Q88. Let’s focus on Q09 as a representative of these queries, given by the following fragment:

SELECT CASE
WHEN (SELECT COUNT(*)
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20) > 48409437
THEN (SELECT AVG(ss_ext_discount_amt)
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20)
ELSE (SELECT AVG(ss_net_profit)
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20) END
AS bucket1,
<<4 more variations of the CASE expression above>>
FROM reason
WHERE r_reason_sk = 1

In total, there are 15 scans of the fact table store_sales, each one returning various aggregates over different subsets of data. The engine first performs subquery removal and transforms the various expressions in the CASE statements into relational subtrees connected via cross products, and then they are fused into one subquery handling all scalar aggregates. The resulting plan for Q09, described below using SQL for clarity, is given by:

SELECT CASE WHEN v1 > 48409437 THEN t1 ELSE e1 END,
<4 more variations>
FROM (SELECT COUNT(CASE WHEN b1 THEN 1 END) AS v1,
AVG(CASE WHEN b1 THEN ss_ext_discount_amt END) AS t1,
AVG(CASE WHEN b1 THEN ss_net_profit END) AS e1,
<4 more variations>
FROM reason,
(SELECT *,
ss_quantity BETWEEN 1 AND 20 AS b1,
<4 more variations>
FROM store_sales
WHERE ss_quantity BETWEEN 1 AND 20 OR
<4 more variations>))
WHERE r_reason_sk = 1)

In general, this rewrite rule results in the largest improvements both in latency (from 3x to 8x improvements) and bytes read from Amazon S3 (from 6x to 8x reduction in scanned bytes and, consequently, cost).

Bloom filter for partition columns

Amazon Redshift already uses Bloom filters on data columns of external tables in Amazon S3 to enable early and effective data filtering. Last year, we extended this support for partition columns as well. A Bloom filter is a probabilistic, memory-efficient data structure that accelerates join queries at scale by filtering rows that do not match the join relation, significantly reducing the amount of data transferred over the network. Amazon Redshift automatically determines what queries are suitable for leveraging Bloom filters at query runtime.

This optimization resulted in performance improvements on TPC-DS queries Q05, Q17 and Q54. This optimization resulted in large improvements in both latency (from 2x to 3x improvement) and bytes read from S3 (from 9x to 15x reduction in scanned bytes and, consequently cost).

Following is the subquery of Q05 which showcased improvements with runtime filter.

select s_store_id,
sum(sales_price) as sales,
sum(profit) as profit,
sum(return_amt) as returns,
sum(net_loss) as profit_loss
from
( select  ss_store_sk as store_sk,
ss_sold_date_sk  as date_sk,
ss_ext_sales_price as sales_price,
ss_net_profit as profit,
cast(0 as decimal(7,2)) as return_amt,
cast(0 as decimal(7,2)) as net_loss
from tpcds_3t_alls3_pp_ext.store_sales
union all
select sr_store_sk as store_sk,
sr_returned_date_sk as date_sk,
cast(0 as decimal(7,2)) as sales_price,
cast(0 as decimal(7,2)) as profit,
sr_return_amt as return_amt,
sr_net_loss as net_loss
from tpcds_3t_alls3_pp_ext.store_returns
) salesreturnss,
tpcds_3t_alls3_pp_ext.date_dim,
tpcds_3t_alls3_pp_ext.store
where date_sk = d_date_sk
and d_date between cast('1998-08-13' as date)
and (cast('1998-08-13' as date) +  14)
and store_sk = s_store_sk
group by s_store_id

Without bloom filter support on partition columns

Following figure is the logical query plan for sub-query of Q05. This appends two large fact tables store_sales (8B rows) and store_returns (863M rows) and then joins with very selective dimension tables date_dim and then with dimension table store. You can observe that join with date_dim table reduces the number of rows from 9B to 93M rows.

With bloom filter support on partition columns

With support of bloom filter on partition columns, we now create bloom filter for d_date_sk column of date_dim table and push down the bloom filters to store_sales and store_returns table. These bloom filters help to filter out the partitions in both store_sales and store_returns table because join happens on partition column (number of partitions processed reduces by 10x).

Logical query plan for sub-query of Q05 without bloom filter support on partition columns

Figure 7: Logical query plan for sub-query of Q05 without bloom filter support on partition columns

Logical query plan for sub-query of Q05 with bloom filter support on partition columns

Figure 8: Logical query plan for sub-query of Q05 with bloom filter support on partition columns

Overall, bloom filter on partition column will reduce the number of partitions processed resulting in reduced S3 listing calls and lesser number of data files to be read (reduction in scanned bytes). You can see that we only scan 89M rows from store_sales and 4M rows from store_returns because of the bloom filter. This reduced number of rows to process at JOIN level and helped in improving the overall query performance by 2x and scanned bytes by 9x.

Conclusion

In this post, we covered new performance optimizations in Amazon Redshift data lake query processing and how AWS Glue Data Catalog statistics helps to enhance quality of query plans for data lake queries in Amazon Redshift. These optimizations together improved TPC-DS 3 TB benchmark by 3x. Some of the queries in our benchmark benefited up to 12x speed up.

In summary, Amazon Redshift now offers enhanced query performance with optimizations such as AWS Glue Data Catalog column statistics, bloom filters on partition columns, new query rewrite rules and faster retrieval of metadata. These optimizations are enabled by default and Amazon Redshift users will benefit with better query response times for their workloads. For more information, please reach out to your AWS technical account manager or AWS account solutions architect. They will be happy to provide additional guidance and support.


About the authors

Kalaiselvi Kamaraj is a Sr. Software Development Engineer with Amazon. She has worked on several projects within Redshift Query processing team and currently focusing on performance related projects for Redshift Data Lake.

Mark Lyons is a Principal Product Manager on the Amazon Redshift team. He works on the intersection of data lakes and data warehouses. Prior to joining AWS, Mark held product leadership roles with Dremio and Vertica. He is passionate about data analytics and empowering customers to change the world with their data.

Asser Moustafa is a Principal Worldwide Specialist Solutions Architect at AWS, based in Dallas, Texas, USA. He partners with customers worldwide, advising them on all aspects of their data architectures, migrations, and strategic data visions to help organizations adopt cloud-based solutions, maximize the value of their data assets, modernize legacy infrastructures, and implement cutting-edge capabilities like machine learning and advanced analytics. Prior to joining AWS, Asser held various data and analytics leadership roles, completing an MBA from New York University and an MS in Computer Science from Columbia University in New York. He is passionate about empowering organizations to become truly data-driven and unlock the transformative potential of their data.

Data engineering professional certificate: New hands-on specialization by DeepLearning.AI and AWS

Post Syndicated from Betty Zheng (郑予彬) original https://aws.amazon.com/blogs/aws/data-engineering-professional-certificate-new-hands-on-specialization-by-deeplearning-ai-and-aws/

Data engineers play a crucial role in the modern data-driven landscape, managing essential tasks from data ingestion and processing to transformation and serving. Their expertise is particularly valuable in the era of generative AI, where harnessing the value of vast datasets is paramount.

To empower aspiring and experienced data professionals, DeepLearning.AI and Amazon Web Services (AWS) have partnered to launch the Data Engineering Specialization, an advanced professional certificate on Coursera. This comprehensive program covers a wide range of data engineering concepts, tools, and techniques relevant to modern organizations. It’s designed for learners with some experience working with data who are interested in learning the fundamentals of data engineering. The specialization comprises four hands-on courses, each culminating in a Coursera course certificate upon completion.

Specialization overview

This Data Engineering Specialization is a joint initiative by AWS and DeepLearning.AI, a leading provider of world-class AI education founded by renowned machine learning (ML) pioneer Andrew Ng.

Joe Reis, a prominent figure in data engineering and coauthor of the bestselling book Fundamentals of Data Engineering, leads the program as a primary instructor. By providing a foundational framework, the curriculum ensures learners gain a holistic understanding of the data engineering lifecycle, while covering key aspect such as data architecture, orchestration, DataOps, and data management.

Further enhancing the learning experience, the program features hands-on labs and technical assessments hosted on the AWS Cloud. These practical, cloud-based exercises were designed in partnership with AWS technical experts, including Gal Heyne, Navnit Shukla, and Morgan Willis. Learners will apply theoretical concepts using AWS services and tools, such as Amazon Kinesis, AWS Glue, Amazon Simple Storage Service (Amazon S3), and Amazon Redshift, equipping them with hands-on skill and experience.

Specialization highlights

Participants will be introduced to several key learning opportunities.

Acquisition of core skills and strategies

The specialization equips data engineers with the ability to design data engineering solutions for various use cases, select the right technologies for their data architecture, and circumvent potential pitfalls. The skills gained universally apply across various platforms and technologies, offering learners a program that is versatile.

Unparalleled approach to data engineering education

Unlike conventional courses focused on specific technologies, this specialization provides a comprehensive understanding of data engineering fundamentals. It emphasizes the importance of aligning data engineering strategies with broader business goals, fostering a more integrated and effective approach to building and maintaining data solutions.

Holistic understanding of data engineering

By using the insights from the Fundamentals of Data Engineering book, the curriculum offers a well-rounded education that prepares professionals for success in the data-driven focused industries.

Practical skills through AWS cloud labs

The hands-on labs hosted by AWS Partner Vocareum let learners apply the techniques directly in an AWS environment provided with the course. This practical experience is crucial for mastering the intricacies of data engineering and developing the skills needed to excel in the industry.

Why choose this specialization?

  • Structured learning path–The specification is thoughtfully structured to provide a step-by-step learning journey, from foundational concepts to advanced applications.
  • Expert insights–Gain insights from the authors of Fundamentals of Data Engineering and other industry experts. Learn how to apply practical knowledge to build modern data architecture on the cloud, using cloud services for data engineering.
  • Hands-on experience–Engage in hands-on labs in the AWS Cloud, where you not only learn but also apply the knowledge in real-world scenarios.
  • Comprehensive curriculum–This program encompasses all aspects of the data engineering lifecycle, including data generation in source systems, ingestion, transformation, storage, and serving. It also addresses key undercurrents of data engineering, such as security, data management, and orchestration.

At the end of this specialization, learners will be well-equipped with the necessary skills and expertise to embark on a career in data engineering, an in-demand role at the core of any organization that is looking to use data to create value. Data-centric ML and analytics would not be possible without the foundation of data engineering.

Course modules

The Data Engineering Specialization comprises four courses:

  • Course 1–Introduction to Data Engineering–This foundational module explores the collaborative nature of data engineering, identifying key stakeholders and understanding their requirements. The course delves into a mental framework for building data engineering solutions, emphasizing holistic ecosystem understanding, critical factors like data quality and scalability, and effective requirements gathering. The course then examines the data engineering lifecycle, illustrating interconnections between stages. By showcasing the AWS data engineering stack, the course teaches how to use the right technologies. By the end of this course, learners will have the skills and mindset to tackle data engineering challenges and make informed decisions.
  • Course 2–Source Systems, Data Ingestion, and Pipelines–In this course, data engineers dive deep into the practical aspects of working with diverse data sources, ingestion patterns, and pipeline construction. Learners explore the characteristics of different data formats and the appropriate source systems for generating each type of data, equipping them with the knowledge to design effective data pipelines. The course covers the fundamentals of relational and NoSQL databases, including ACID compliance and CRUD operations, so that engineers learn to interact with a wide range of data source systems. The course covers the significance of cloud networking, resolving database connection issues, and using message queues and streaming platforms—crucial skills for creating strong and scalable data architectures. By mastering the concepts in this course, data engineers will be able to automate data ingestion processes, optimize connectivity, and establish the foundation for successful data engineering projects.
  • Course 3–Data Storage and Queries–This course equips data engineers with principles and best practices for designing robust, efficient data storage and querying solutions. Learners explore the data lake house concept, implementing a medallion-like architecture and using open table formats to build transactional data lakes. The course enhances SQL proficiency by teaching advanced queries, such as aggregations and joins on streaming data, while also exploring data warehouse and data lake capabilities. Learners compare storage performance and discover optimization strategies, like indexing. Data engineers can achieve high performance and scalability in data services by comprehending query execution and processing.
  • Course 4–Data Modeling, Transformation, and Serving–In this capstone course, data engineers explore advanced data modeling techniques, including data vault and star schemas. Learners differentiate between modeling approaches like Inmon and Kimball, gaining the ability to transform data and structure it for optimal analytical and ML use cases. The course equips data engineers with preprocessing skills for textual, image, and tabular data. Learners understand the distinctions between supervised and unsupervised learning, as well as classification and regression tasks, empowering them to design data solutions supporting a range of predictive applications. By mastering these data modeling, transformation, and serving concepts, data engineers can build robust, scalable, and business-aligned data architectures to deliver maximum value.

Enrollment

Whether you’re new to data engineering or looking to enhance your skills, this specialization provides a balanced mix of theory and hands-on experience through 4 courses, each culminating in a Coursera course certificate.

Embark on your data engineering journey from here:

By enrolling in these courses, you’ll also earn the DeepLearning.AI Data Engineering Professional Certificate upon completing all four courses.

Enroll now and take the first step towards mastering data engineering with this comprehensive and practical program, built on the foundation of Fundamentals of Data Engineering and powered by AWS.

How HPE Aruba Supply Chain optimized cost and performance by migrating to an AWS modern data architecture

Post Syndicated from Hardeep Randhawa original https://aws.amazon.com/blogs/big-data/how-hpe-aruba-supply-chain-optimized-cost-and-performance-by-migrating-to-an-aws-modern-data-architecture/

This blog post is co-written with Hardeep Randhawa and Abhay Kumar from HPE.

HPE Aruba Networking, formerly known as Aruba Networks, is a Santa Clara, California-based security and networking subsidiary of Hewlett Packard Enterprise company. HPE Aruba Networking is the industry leader in wired, wireless, and network security solutions. Hewlett-Packard acquired Aruba Networks in 2015, making it a wireless networking subsidiary with a wide range of next-generation network access solutions.

Aruba offers networking hardware like access points, switches, routers, software, security devices, and Internet of Things (IoT) products. Their large inventory requires extensive supply chain management to source parts, make products, and distribute them globally. This complex process involves suppliers, logistics, quality control, and delivery.

This post describes how HPE Aruba automated their Supply Chain management pipeline, and re-architected and deployed their data solution by adopting a modern data architecture on AWS.

Challenges with the on-premises solution

As the demand surged with time, it was imperative that Aruba build a sophisticated and powerful supply chain solution that could help them scale operations, enhance visibility, improve predictability, elevate customer experience, and drive sustainability. To achieve their vision of a modern, scalable, resilient, secure, and cost-efficient architecture, they chose AWS as their trusted partner due to the range of low-cost, scalable, and reliable cloud services they offer.

Through a commitment to cutting-edge technologies and a relentless pursuit of quality, HPE Aruba designed this next-generation solution as a cloud-based cross-functional supply chain workflow and analytics tool. The application supports custom workflows to allow demand and supply planning teams to collaborate, plan, source, and fulfill customer orders, then track fulfillment metrics via persona-based operational and management reports and dashboards. This also includes building an industry standard integrated data repository as a single source of truth, operational reporting through real time metrics, data quality monitoring, 24/7 helpdesk, and revenue forecasting through financial projections and supply availability projections. Overall, this new solution has empowered HPE teams with persona-based access to 10 full-scale business intelligence (BI) dashboards and over 350 report views across demand and supply planning, inventory and order management, SKU dashboards, deal management, case management, backlog views, and big deal trackers.

Overview of the solution

This post describes how HPE Aruba automated their supply chain management pipeline, starting from data migration from varied data sources into a centralized Amazon Simple Storage Service (Amazon S3) based storage to building their data warehouse on Amazon Redshift with the publication layer built on a third-party BI tool and user interface using ReactJS.

The following diagram illustrates the solution architecture.

https://admin.pulse.aws/survey/Survey-2khLQ3YQTeQ1k3VcjAFdn5UsCYb/

In the following sections, we go through the key components in the diagram in more detail:

  1. Source systems
  2. Data migration
  3. Regional distribution
  4. Orchestration
  5. File processing
  6. Data quality checks
  7. Archiving processed files
  8. Copying to Amazon Redshift
  9. Running stored procedures
  10. UI integration
  11. Code Deployment
  12. Security & Encryption
  13. Data Consumption
  14. Final Steps

1. Source systems

Aruba’s source repository includes data from three different operating regions in AMER, EMEA, and APJ, along with one worldwide (WW) data pipeline from varied sources like SAP S/4 HANA, Salesforce, Enterprise Data Warehouse (EDW), Enterprise Analytics Platform (EAP) SharePoint, and more. The data sources include 150+ files including 10-15 mandatory files per region ingested in various formats like xlxs, csv, and dat. Aruba’s data governance guidelines required that they use a single centralized tool that could securely and cost-effectively review all source files with multiple formats, sizes, and ingestion times for compliance before exporting them out of the HPE environment. To achieve this, Aruba first copied the respective files to a centralized on-premises staging layer.

2. Data migration

Aruba chose AWS Transfer Family for SFTP for secure and efficient file transfers from an on-premises staging layer to an Amazon S3 based landing zone. AWS Transfer Family seamlessly integrates with other AWS services, automates transfer, and makes sure data is protected with encryption and access controls. To prevent deduplication issues and maintain data integrity, Aruba customized these data transfer jobs to make sure previous transfers are complete before copying the next set of files.

3. Regional distribution

On average, Aruba transfers approximately 100 files, with total size ranging from 1.5–2 GB into the landing zone daily. The data volume increases each Monday with the weekly file loads and at the beginning of each month with the monthly file loads. These files follow the same naming pattern, with a daily system-generated timestamp appended to each file name. Each file arrives as a pair with a tail metadata file in CSV format containing the size and name of the file. This metadata file is later used to read source file names during processing into the staging layer.

The source data contains files from three different operating Regions and one worldwide pipeline that needs to be processed per local time zones. Therefore, separating the files and running a distinct pipeline for each was necessary to decouple and enhance failure tolerance. To achieve this, Aruba used Amazon S3 Event Notifications. With each file uploaded to Amazon S3, an Amazon S3 PUT event invokes an AWS Lambda function that distributes the source and the metadata files Region-wise and loads them into the respective Regional landing zone S3 bucket. To map the file with the respective Region, this Lambda function uses Region-to-file mapping stored in a configuration table in Amazon Aurora PostgreSQL-Compatible Edition.

4. Orchestration

The next requirement was to set up orchestration for the data pipeline to seamlessly implement the required logic on the source files to extract meaningful data. Aruba chose AWS Step Functions for orchestrating and automating their extract, transform, and load (ETL) processes to run on a fixed schedule. In addition, they use AWS Glue jobs for orchestrating validation jobs and moving data through the data warehouse.

They used Step Functions with Lambda and AWS Glue for automated orchestration to minimize the cloud solution deployment timeline by reusing the on-premises code base, where possible. The prior on-premises data pipeline was orchestrated using Python scripts. Therefore, integrating the existing scripts with Lambda within Step Functions and AWS Glue helped accelerate their deployment timeline on AWS.

5. File processing

With each pipeline running at 5:00 AM local time, the data is further validated, processed, and then moved to the processing zone folder in the same S3 bucket. Unsuccessful file validation results in the source files being moved to the reject zone S3 bucket directory. The following file validations are run by the Lambda functions invoked by the Step Functions workflow:

  • The Lambda function validates if the tail file is available with the corresponding source data file. When each complete file pair lands in the Regional landing zone, the Step Functions workflow considers the source file transfer as complete.
  • By reading the metadata file, the file validation function validates that the names and sizes of the files that land in the Regional landing zone S3 bucket match with the files on the HPE on-premises server.

6. Data quality checks

When the files land in the processing zone, the Step Functions workflow invokes another Lambda function that converts the raw files to CSV format followed by stringent data quality checks. The final validated CSV files are loaded into the temp raw zone S3 folder.

The data quality (DQ) checks are managed using DQ configurations stored in Aurora PostgreSQL tables. Some examples of DQ checks include duplicate data check, null value check, and date format check. The DQ processing is managed through AWS Glue jobs, which are invoked by Lambda functions from within the Step Functions workflow. A number of data processing logics are also integrated in the DQ flow, such as the following:

  • Flag-based deduplication – For specific files, when a flag managed in the Aurora configuration table is enabled, the process removes duplicates before processing the data
  • Pre-set values replacing nulls – Similarly, a preset value of 1 or 0 would imply a NULL in the source data based on the value set in the configuration table

7. Archiving processed files

When the CSV conversion is complete, the original raw files in the processing zone S3 folder are archived for 6 months in the archive zone S3 bucket folder. After 6 months, the files on AWS are deleted, with the original raw files retained in the HPE source system.

8. Copying to Amazon Redshift

When the data quality checks and data processing are complete, the data is loaded from the S3 temp raw zone into the curated zone on an Redshift provisioned cluster, using the COPY command feature.

9. Running stored procedures

From the curated zone, they use AWS Glue jobs, where the Redshift stored procedures are orchestrated to load the data from the curated zone into the Redshift publish zone. The Redshift publish zone is a different set of tables in the same Redshift provisioned cluster. The Redshift stored procedures process and load the data into fact and dimension tables in a star schema.

10. UI integration

Amazon OpenSearch Service is also integrated with the flow for publishing mass notifications to the end-users through the user interface (UI). The users can also send messages and post updates via the UI with the OpenSearch Service integration.

11. Code Deployment

Aruba uses AWS CodeCommit and AWS CodePipeline to deploy and manage a bi-monthly code release cycle, the frequency for which can be increased on-demand as per deployment needs. The release happens across four environments – Development, Testing, UAT and Production – deployed through DevOps discipline, thus enabling shorter turnaround time to ever-changing user requirements and upstream data source changes.

12. Security & Encryption

User access to the Aruba SC360 portal is managed via SSO with MFA authentication and data security managed via direct integration of the AWS solution with HPE IT’s unified access management API. All the data pipelines between HPE on-premises sources and S3 are encrypted for enhanced security.

13. Data Consumption

Aruba SC360 application provides a ‘Private Space’ feature to other BI/Analytics teams within HPE to run and manage their own data ingestion pipeline. This has been built using Amazon Redshift data sharing feature, which has enabled Aruba to securely share access to live data in their Amazon Redshift cluster, without manually moving or copying the data. Thus, the HPE internal teams could build their own data workloads on core Aruba SC360 data while maintaining data security and code isolation.

14. Final Steps

The data is finally fetched into the publication layer, which consists of a ReactJS-based user interface accessing the data in the Amazon publish zone using Spring Boot REST APIs. Along with data from the Redshift data warehouse, notifications updated in the OpenSearch Service tables are also fetched and loaded into the UI. Amazon Aurora PostgreSQL is used to maintain the configuration values for populating the UI. To build BI dashboards, Aruba opted to continue using their existing third-party BI tool due to its familiarity among internal teams.

Conclusion

In this post, we showed you how HPE Aruba Supply Chain successfully re-architected and deployed their data solution by adopting a modern data architecture on AWS.

The new solution has helped Aruba integrate data from multiple sources, along with optimizing their cost, performance, and scalability. This has also allowed the Aruba Supply Chain leadership to receive in-depth and timely insights for better decision-making, thereby elevating the customer experience.

To learn more about the AWS services used to build modern data solutions on AWS, refer to the AWS public documentation and stay up to date through the AWS Big Data Blog.


About the authors

Hardeep Randhawa is a Senior Manager – Big Data & Analytics, Solution Architecture at HPE, recognized for stewarding enterprise-scale programs and deployments. He has led a recent Big Data EAP (Enterprise Analytics Platform) build with one of the largest global SAP HANA/S4 implementations at HPE.

Abhay Kumar is a Lead Data Engineer in Aruba Supply Chain Analytics and manages the Cloud Infrastructure for the Application at HPE. With 11+ years of experience in the IT industry domains like banking, supply chain and Abhay has a strong background in Cloud Technologies, Data Analytics, Data Management, and Big Data systems. In his spare time, he likes reading, exploring new places and watching movies.

Ritesh Chaman is a Senior Technical Account Manager at Amazon Web Services. With 14 years of experience in the IT industry, Ritesh has a strong background in Data Analytics, Data Management, Big Data systems and Machine Learning. In his spare time, he loves cooking, watching sci-fi movies, and playing sports.

Sushmita Barthakur is a Senior Solutions Architect at Amazon Web Services, supporting Enterprise customers architect their workloads on AWS. With a strong background in Data Analytics and Data Management, she has extensive experience helping customers architect and build Business Intelligence and Analytics Solutions, both on-premises and the cloud. Sushmita is based out of Tampa, FL and enjoys traveling, reading and playing tennis.

Amazon EMR 7.1 runtime for Apache Spark and Iceberg can run Spark workloads 2.7 times faster than Apache Spark 3.5.1 and Iceberg 1.5.2

Post Syndicated from Hari Kishore Chaparala original https://aws.amazon.com/blogs/big-data/amazon-emr-7-1-runtime-for-apache-spark-and-iceberg-can-run-spark-workloads-2-7-times-faster-than-apache-spark-3-5-1-and-iceberg-1-5-2/

In this post, we explore the performance benefits of using the Amazon EMR runtime for Apache Spark and Apache Iceberg compared to running the same workloads with open source Spark 3.5.1 on Iceberg tables. Iceberg is a popular open source high-performance format for large analytic tables. Our benchmarks demonstrate that Amazon EMR can run TPC-DS 3 TB workloads 2.7 times faster, reducing the runtime from 1.548 hours to 0.564 hours. Additionally, the cost efficiency improves by 2.2 times, with the total cost decreasing from $16.09 to $7.23 when using Amazon Elastic Compute Cloud (Amazon EC2) On-Demand r5d.4xlarge instances, providing observable gains for data processing tasks.

The Amazon EMR runtime for Apache Spark offers a high-performance runtime environment while maintaining 100% API compatibility with open source Spark and Iceberg table format. In Run Apache Spark 3.5.1 workloads 4.5 times faster with Amazon EMR runtime for Apache Spark, we detailed some of the optimizations, showing a runtime improvement of 4.5 times faster and 2.8 times better price-performance compared to open source Spark 3.5.1 on the TPC-DS 3 TB benchmark. However, many of the optimizations are geared towards DataSource V1, whereas Iceberg uses Spark DataSource V2. Recognizing this, we have focused on migrating some of the existing optimizations in the EMR runtime for Spark to DataSource V2 and introducing Iceberg-specific enhancements. These improvements are built on top of the Spark runtime enhancements on query planning, physical plan operator improvements, and optimizations with Amazon Simple Storage Service (Amazon S3) and the Java runtime. We have added eight new optimizations incrementally since the Amazon EMR 6.15 release in 2023, which are present in Amazon EMR 7.1 and turned on by default. Some of the improvements include the following:

  • Optimizing DataSource V2 in Spark:
    • Dynamic filtering on non-partitioned columns
    • Removing redundant broadcast hash joins
    • Partial hash aggregate pushdowns
    • Bloom filter-based joins
  • Iceberg-specific enhancements:
    • Data prefetch
    • Support for file size-based estimations

Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, and Amazon EMR on AWS Outposts all use the optimized runtimes. Refer to Working with Apache Iceberg in Amazon EMR and Best practices for optimizing Apache Iceberg workloads for more details.

Benchmark results for Amazon EMR 7.1 vs. open source Spark 3.5.1 and Iceberg 1.5.2

To assess the Spark engine’s performance with the Iceberg table format, we performed benchmark tests using the 3 TB TPC-DS dataset, version 2.13 (our results derived from the TPC-DS dataset are not directly comparable to the official TPC-DS results due to setup differences). Benchmark tests for the EMR runtime for Spark and Iceberg were conducted on Amazon EMR 7.1 clusters with Spark 3.5.0 and Iceberg 1.4.3-amzn-0 versions, and open source Spark 3.5.1 and Iceberg 1.5.2 was deployed on EC2 clusters designated for open source runs.

The setup instructions and technical details are available in our GitHub repository. To minimize the influence of external catalogs like AWS Glue and Hive, we used the Hadoop catalog for the Iceberg tables. This uses the underlying file system, specifically Amazon S3, as the catalog. We can define this setup by configuring the property spark.sql.catalog.<catalog_name>.type. The fact tables used the default partitioning by the date column, which have a number of partitions varying from 200–2,100. No precalculated statistics were used for these tables.

We ran a total of 104 SparkSQL queries in three sequential rounds, and the average runtime of each query across these rounds was taken for comparison. The average runtime for the three rounds on Amazon EMR 7.1 with Iceberg enabled was 0.56 hours, demonstrating a 2.7-fold speed increase compared to open source Spark 3.5.1 and Iceberg 1.5.2. The following figure presents the total runtimes in seconds.

The following table summarizes the metrics.

Metric Amazon EMR 7.1 on EC2 Open Source Spark 3.5.1 and Iceberg 1.5.2
Average runtime in seconds 2033.17 5575.19
Geometric mean over queries in seconds 10.13153 20.34651
Cost* $7.23 $16.09

*Detailed cost estimates are discussed later in this post.

The following chart demonstrates the per-query performance improvement of Amazon EMR 7.1 relative to open source Spark 3.5.1 and Iceberg 1.5.2. The extent of the speedup varies from one query to another, ranging from 9.6 times faster for q93 to 1.04 times faster for q34, with Amazon EMR outperforming the open source Spark with Iceberg tables. The horizontal axis arranges the TPC-DS 3 TB benchmark queries in descending order based on the performance improvement seen with Amazon EMR, and the vertical axis depicts the magnitude of this speedup in seconds.

Cost comparison

Our benchmark provides the total runtime and geometric mean data to assess the performance of Spark and Iceberg in a complex, real-world decision support scenario. For additional insights, we also examine the cost aspect. We calculate cost estimates using formulas that account for EC2 On-Demand instances, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR expenses.

  • Amazon EC2 cost (includes SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

The calculations reveal that the Amazon EMR 7.1 benchmark yields a 2.2-fold cost efficiency improvement over open source Spark 3.5.1 and Iceberg 1.5.2 in running the benchmark job.

Metric Amazon EMR 7.1 Open Source Spark 3.5.1 and Iceberg 1.5.2
Runtime in hours 0.564 1.548
Number of EC2 instances 9 9
Amazon EBS Size 20gb 20gb
Amazon EC2 cost $5.85 $16.05
Amazon EBS cost $0.01 $0.04
Amazon EMR cost $1.37 $0
Total cost $7.23 $16.09
Cost savings Amazon EMR 7.1 is 2.2 times better Baseline

In addition to the time-based metrics discussed so far, data from Spark event logs shows that Amazon EMR 7.1 scanned approximately 3.4 times less data from Amazon S3 and 4.1 times fewer records than the open source version in the TPC-DS 3 TB benchmark. This reduction in Amazon S3 data scanning contributes directly to cost savings for Amazon EMR workloads.

Run open source Spark benchmarks on Iceberg tables

We used separate EC2 clusters, each equipped with nine r5d.4xlarge instances, for testing both open source Spark 3.5.1 and Iceberg 1.5.2 and Amazon EMR 7.1. The primary node was equipped with 16 vCPU and 128 GB of memory, and the eight worker nodes together had 128 vCPU and 1024 GB of memory. We conducted tests using the Amazon EMR default settings to showcase the typical user experience and minimally adjusted the settings of Spark and Iceberg to maintain a balanced comparison.

The following table summarizes the Amazon EC2 configurations for the primary node and eight worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance Storage (GB) EBS Root Volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20 GB

Prerequisites

The following prerequisites are required to run the benchmarking:

  1. Using the instructions in the emr-spark-benchmark GitHub repo, set up the TPC-DS source data in your S3 bucket and on your local computer.
  2. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application and copy the benchmark application to your S3 bucket. Alternatively, copy spark-benchmark-assembly-3.5.1.jar to your S3 bucket.
  3. Create Iceberg tables from the TPC-DS source data. Follow the instructions on GitHub to create Iceberg tables using the Hadoop catalog. For example, the following code uses an EMR 7.1 cluster with Iceberg enabled to create the tables:
aws emr add-steps --cluster-id <cluster-id> --steps Type=Spark,Name="Create Iceberg Tables",
Args=[--class,com.amazonaws.eks.tpcds.CreateIcebergTables,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.hadoop_catalog.type=hadoop,
--conf,spark.sql.catalog.hadoop_catalog.warehouse=s3://<bucket>/<warehouse_path>/,
--conf,spark.sql.catalog.hadoop_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<bucket>/<jar_location>/spark-benchmark-assembly-3.5.1.jar,
s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/,
/home/hadoop/tpcds-kit/tools,parquet,3000,true,<database_name>,true,true],ActionOnFailure=CONTINUE 
--region <AWS region>

Note the Hadoop catalog warehouse location and database name from the preceding step. We use the same tables to run benchmarks with Amazon EMR 7.1 and open source Spark and Iceberg.

This benchmark application is built from the branch tpcds-v2.13_iceberg. If you’re building a new benchmark application, switch to the correct branch after downloading the source code from the GitHub repo.

Create and configure a YARN cluster on Amazon EC2

To compare Iceberg performance between Amazon EMR on Amazon EC2 and open source Spark on Amazon EC2, follow the instructions in the emr-spark-benchmark GitHub repo to create an open source Spark cluster on Amazon EC2 using Flintrock with eight worker nodes.

Based on the cluster selection for this test, the following configurations are used:

Run the TPC-DS benchmark with Apache Spark 3.5.1 and Iceberg 1.5.2

Complete the following steps to run the TPC-DS benchmark:

  1. Log in to the open source cluster primary using flintrock login $CLUSTER_NAME.
  2. Submit your Spark job:
    1. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables.
    2. The results are created in s3://<YOUR_S3_BUCKET>/benchmark_run.
    3. You can track progress in /media/ephemeral0/spark_run.log.
spark-submit \
--master yarn \
--deploy-mode client \
--class com.amazonaws.eks.tpcds.BenchmarkSQL \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=10g \
--conf spark.executor.cores=16 \
--conf spark.executor.memory=100g \
--conf spark.executor.instances=8 \
--conf spark.network.timeout=2000 \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions   \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog    \
--conf spark.sql.catalog.local.type=hadoop  \
--conf spark.sql.catalog.local.warehouse=s3a://<YOUR_S3_BUCKET>/<warehouse_path>/ \
--conf spark.sql.defaultCatalog=local   \
--conf spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO   \
spark-benchmark-assembly-3.5.1.jar   \
s3://<YOUR_S3_BUCKET>/benchmark_run 3000 1 false  \
q1-v2.13,q10-v2.13,q11-v2.13,q12-v2.13,q13-v2.13,q14a-v2.13,q14b-v2.13,q15-v2.13,q16-v2.13,\
q17-v2.13,q18-v2.13,q19-v2.13,q2-v2.13,q20-v2.13,q21-v2.13,q22-v2.13,q23a-v2.13,q23b-v2.13,\
q24a-v2.13,q24b-v2.13,q25-v2.13,q26-v2.13,q27-v2.13,q28-v2.13,q29-v2.13,q3-v2.13,q30-v2.13,\
q31-v2.13,q32-v2.13,q33-v2.13,q34-v2.13,q35-v2.13,q36-v2.13,q37-v2.13,q38-v2.13,q39a-v2.13,\
q39b-v2.13,q4-v2.13,q40-v2.13,q41-v2.13,q42-v2.13,q43-v2.13,q44-v2.13,q45-v2.13,q46-v2.13,\
q47-v2.13,q48-v2.13,q49-v2.13,q5-v2.13,q50-v2.13,q51-v2.13,q52-v2.13,q53-v2.13,q54-v2.13,\
q55-v2.13,q56-v2.13,q57-v2.13,q58-v2.13,q59-v2.13,q6-v2.13,q60-v2.13,q61-v2.13,q62-v2.13,\
q63-v2.13,q64-v2.13,q65-v2.13,q66-v2.13,q67-v2.13,q68-v2.13,q69-v2.13,q7-v2.13,q70-v2.13,\
q71-v2.13,q72-v2.13,q73-v2.13,q74-v2.13,q75-v2.13,q76-v2.13,q77-v2.13,q78-v2.13,q79-v2.13,\
q8-v2.13,q80-v2.13,q81-v2.13,q82-v2.13,q83-v2.13,q84-v2.13,q85-v2.13,q86-v2.13,q87-v2.13,\
q88-v2.13,q89-v2.13,q9-v2.13,q90-v2.13,q91-v2.13,q92-v2.13,q93-v2.13,q94-v2.13,q95-v2.13,\
q96-v2.13,q97-v2.13,q98-v2.13,q99-v2.13,ss_max-v2.13    \
true <database> > /media/ephemeral0/spark_run.log 2>&1 &!

Summarize the results

After the Spark job finishes, retrieve the test result file from the output S3 bucket at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. This can be done either through the Amazon S3 console by navigating to the specified bucket location or by using the Amazon Command Line Interface (AWS CLI). The Spark benchmark application organizes the data by creating a timestamp folder and placing a summary file within a folder labeled summary.csv. The output CSV files contain four columns without headers:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

With the data from three separate test runs with one iteration each time, we can calculate the average and geometric mean of the benchmark runtimes.

Run the TPC-DS benchmark with the EMR runtime for Spark

Most of the instructions are similar to Steps to run Spark Benchmarking with a few Iceberg-specific details.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure the AWS CLI shell to point to the benchmarking AWS account. Refer to Configure the AWS CLI for instructions.
  2. Upload the benchmark application JAR file to Amazon S3.

Deploy the EMR cluster and run the benchmark job

Complete the following steps to run the benchmark job:

  1. Use the AWS CLI command as shown in Deploy EMR on EC2 Cluster and run benchmark job to spin up an EMR on EC2 cluster. Make sure to enable Iceberg. See Create an Iceberg cluster for more details. Choose the correct Amazon EMR version, root volume size, and same resource configuration as the open source Flintrock setup. Refer to create-cluster for a detailed description of the AWS CLI options.
  2. Store the cluster ID from the response. We need this for the next step.
  3. Submit the benchmark job in Amazon EMR using add-steps from the AWS CLI:
    1. Replace <cluster ID> with the cluster ID from Step 2.
    2. The benchmark application is at s3://<your-bucket>/spark-benchmark-assembly-3.5.1.jar.
    3. Choose the correct Iceberg catalog warehouse location and database that has the created Iceberg tables. This should be the same as the one used for the open source TPC-DS benchmark run.
    4. The results will be in s3://<your-bucket>/benchmark_run.
aws emr add-steps   --cluster-id <cluster-id>
--steps Type=Spark,Name="SPARK Iceberg EMR TPCDS Benchmark Job",
Args=[--class,com.amazonaws.eks.tpcds.BenchmarkSQL,
--conf,spark.driver.cores=4,
--conf,spark.driver.memory=10g,
--conf,spark.executor.cores=16,
--conf,spark.executor.memory=100g,
--conf,spark.executor.instances=8,
--conf,spark.network.timeout=2000,
--conf,spark.executor.heartbeatInterval=300s,
--conf,spark.dynamicAllocation.enabled=false,
--conf,spark.shuffle.service.enabled=false,
--conf,spark.sql.iceberg.data-prefetch.enabled=true,
--conf,spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,
--conf,spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog,
--conf,spark.sql.catalog.local.type=hadoop,
--conf,spark.sql.catalog.local.warehouse=s3://<your-bucket>/<warehouse-path>,
--conf,spark.sql.defaultCatalog=local,
--conf,spark.sql.catalog.local.io-impl=org.apache.iceberg.aws.s3.S3FileIO,
s3://<your-bucket>/spark-benchmark-assembly-3.5.1.jar,
s3://<your-bucket>/benchmark_run,3000,1,false,
'q1-v2.13\,q10-v2.13\,q11-v2.13\,q12-v2.13\,q13-v2.13\,q14a-v2.13\,
q14b-v2.13\,q15-v2.13\,q16-v2.13\,q17-v2.13\,q18-v2.13\,q19-v2.13\,
q2-v2.13\,q20-v2.13\,q21-v2.13\,q22-v2.13\,q23a-v2.13\,q23b-v2.13\,
q24a-v2.13\,q24b-v2.13\,q25-v2.13\,q26-v2.13\,q27-v2.13\,q28-v2.13\,
q29-v2.13\,q3-v2.13\,q30-v2.13\,q31-v2.13\,q32-v2.13\,q33-v2.13\,
q34-v2.13\,q35-v2.13\,q36-v2.13\,q37-v2.13\,q38-v2.13\,q39a-v2.13\,
q39b-v2.13\,q4-v2.13\,q40-v2.13\,q41-v2.13\,q42-v2.13\,q43-v2.13\,
q44-v2.13\,q45-v2.13\,q46-v2.13\,q47-v2.13\,q48-v2.13\,q49-v2.13\,
q5-v2.13\,q50-v2.13\,q51-v2.13\,q52-v2.13\,q53-v2.13\,q54-v2.13\,
q55-v2.13\,q56-v2.13\,q57-v2.13\,q58-v2.13\,q59-v2.13\,q6-v2.13\,
q60-v2.13\,q61-v2.13\,q62-v2.13\,q63-v2.13\,q64-v2.13\,q65-v2.13\,
q66-v2.13\,q67-v2.13\,q68-v2.13\,q69-v2.13\,q7-v2.13\,q70-v2.13\,
q71-v2.13\,q72-v2.13\,q73-v2.13\,q74-v2.13\,q75-v2.13\,q76-v2.13\,
q77-v2.13\,q78-v2.13\,q79-v2.13\,q8-v2.13\,q80-v2.13\,q81-v2.13\,
q82-v2.13\,q83-v2.13\,q84-v2.13\,q85-v2.13\,q86-v2.13\,q87-v2.13\,
q88-v2.13\,q89-v2.13\,q9-v2.13\,q90-v2.13\,q91-v2.13\,q92-v2.13\,
q93-v2.13\,q94-v2.13\,q95-v2.13\,q96-v2.13\,q97-v2.13\,q98-v2.13\,
q99-v2.13\,ss_max-v2.13',true,<database>],ActionOnFailure=CONTINUE 
--region <aws-region>

Summarize the results

After the step is complete, you can see the summarized benchmark result at s3://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv in the same way as the previous run and compute the average and geometric mean of the query runtimes.

Clean up

To prevent any future charges, delete the resources you created by following the instructions provided in the Cleanup section of the GitHub repository.

Summary

Amazon EMR is consistently enhancing the EMR runtime for Spark when used with Iceberg tables, achieving a performance that is 2.7 times faster than open source Spark 3.5.1 and Iceberg 1.5.2 on TPC-DS 3 TB, v2.13. We encourage you to keep up to date with the latest Amazon EMR releases to fully benefit from ongoing performance improvements.

To stay informed, subscribe to the AWS Big Data Blog’s RSS feed, where you can find updates on the EMR runtime for Spark and Iceberg, as well as tips on configuration best practices and tuning recommendations.


About the authors

Hari Kishore Chaparala is a software development engineer for Amazon EMR at Amazon Web Services.

Udit Mehrotra is an Engineering Manager for EMR at Amazon Web Services.

Unlock scalable analytics with a secure connectivity pattern in AWS Glue to read from or write to Snowflake

Post Syndicated from Caio Montovani original https://aws.amazon.com/blogs/big-data/unlock-scalable-analytics-with-a-secure-connectivity-pattern-in-aws-glue-to-read-from-or-write-to-snowflake/

In today’s data-driven world, the ability to seamlessly integrate and utilize diverse data sources is critical for gaining actionable insights and driving innovation. As organizations increasingly rely on data stored across various platforms, such as Snowflake, Amazon Simple Storage Service (Amazon S3), and various software as a service (SaaS) applications, the challenge of bringing these disparate data sources together has never been more pressing.

AWS Glue is a robust data integration service that facilitates the consolidation of data from different origins, empowering businesses to use the full potential of their data assets. By using AWS Glue to integrate data from Snowflake, Amazon S3, and SaaS applications, organizations can unlock new opportunities in generative artificial intelligence (AI), machine learning (ML), business intelligence (BI), and self-service analytics or feed data to underlying applications.

In this post, we explore how AWS Glue can serve as the data integration service to bring the data from Snowflake for your data integration strategy, enabling you to harness the power of your data ecosystem and drive meaningful outcomes across various use cases.

Use case

Consider a large ecommerce company that relies heavily on data-driven insights to optimize its operations, marketing strategies, and customer experiences. The company stores vast amounts of transactional data, customer information, and product catalogs in Snowflake. However, they also generate and collect data from various other sources, such as web logs stored in Amazon S3, social media platforms, and third-party data providers. To gain a comprehensive understanding of their business and make informed decisions, the company needs to integrate and analyze data from all these sources seamlessly.

One crucial business requirement for the ecommerce company is to generate a Pricing Summary Report that provides a detailed analysis of pricing and discounting strategies. This report is essential for understanding revenue streams, identifying opportunities for optimization, and making data-driven decisions regarding pricing and promotions. After the Pricing Summary Report is generated and stored in Amazon S3, the company can use AWS analytics services to generate interactive BI dashboards and run one-time queries on the report. This allows business analysts and decision-makers to gain valuable insights, visualize key metrics, and explore the data in depth, enabling informed decision-making and strategic planning for pricing and promotional strategies.

Solution overview

The following architecture diagram illustrates a secure and efficient solution of integrating Snowflake data with Amazon S3, using the native Snowflake connector in AWS Glue. This setup uses AWS PrivateLink to provide secure connectivity between AWS services across different virtual private clouds (VPCs), eliminating the need to expose data to the public internet, which is a critical need for organizations.

BDB-4354-architecture

The following are the key components and steps in the integration process:

  1. Establish a secure, private connection between your AWS account and your Snowflake account using PrivateLink. This involves creating VPC endpoints in both the AWS and Snowflake VPCs, making sure data transfer remains within the AWS network.
  2. Use Amazon Route 53 to create a private hosted zone that resolves the Snowflake endpoint within your VPC. This allows AWS Glue jobs to connect to Snowflake using a private DNS name, maintaining the security and integrity of the data transfer.
  3. Create an AWS Glue job to handle the extract, transform, and load (ETL) process on data from Snowflake to Amazon S3. The AWS Glue job uses the secure connection established by the VPC endpoints to access Snowflake data. Snowflake credentials are securely stored in AWS Secrets Manager. The AWS Glue job retrieves these credentials at runtime to authenticate and connect to Snowflake, providing secure access management. A VPC endpoint enables you to securely communicate with this service without traversing the public internet, enhancing security and performance.
  4. Store the extracted and transformed data in Amazon S3. Organize the data into appropriate structures, such as partitioned folders, to optimize query performance and data management. We use a VPC endpoint enabled to securely communicate with this service without traversing the public internet, enhancing security and performance. We also use Amazon S3 to store AWS Glue scripts, logs, and temporary data generated during the ETL process.

This approach offers the following benefits:

  • Enhanced security – By using PrivateLink and VPC endpoints, data transfer between Snowflake and Amazon S3 is secured within the AWS network, reducing exposure to potential security threats.
  • Efficient data integration – AWS Glue simplifies the ETL process, providing a scalable and flexible solution for data integration between Snowflake and Amazon S3.
  • Cost-effectiveness – Using Amazon S3 for data storage, combined with the AWS Glue pay-as-you-go pricing model, helps optimize costs associated with data management and integration.
  • Scalability and flexibility – The architecture supports scalable data transfers and can be extended to integrate additional data sources and destinations as needed.

By following this architecture and taking advantage of the capabilities of AWS Glue, PrivateLink, and associated AWS services, organizations can achieve a robust, secure, and efficient data integration solution, enabling them to harness the full potential of their Snowflake and Amazon S3 data for advanced analytics and BI.

Prerequisites

Complete the following prerequisites before setting up the solution:

  1. Verify that you have access to AWS account with the necessary permissions to provision resources in services such as Route 53, Amazon S3, AWS Glue, Secrets Manager, and Amazon Virtual Private Cloud (Amazon VPC) using AWS CloudFormation, which lets you model, provision, and manage AWS and third-party resources by treating infrastructure as code.
  2. Confirm that you have access to Snowflake hosted in AWS with required permissions to run the steps to configure PrivateLink. Refer to Enabling AWS PrivateLink in the Snowflake documentation to verify the steps, required access level, and service level to set the configurations. After you enable PrivateLink, save the value of the following parameters provided by Snowflake to use in the next step in this post:
    1. privatelink-vpce-id
    2. privatelink-account-url
    3. privatelink_ocsp-url
    4. regionless-snowsight-privatelink-url
  3. Make sure you have a Snowflake user snowflakeUser and password snowflakePassword with required permissions to read from and write to Snowflake. The user and password are used in the AWS Glue connection to authenticate within Snowflake.
  4. If your Snowflake user doesn’t have a default warehouse set, you will need a warehouse name. We use snowflakeWarehouse as a placeholder for the warehouse name; replace it with your actual warehouse name.
  5. If you’re new to Snowflake, consider completing the Snowflake in 20 Minutes By the end of the tutorial, you should know how to create required Snowflake objects, including warehouses, databases, and tables for storing and querying data.

Create resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup of the base resources. You can review and customize it to suit your needs if needed. The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack to launch the CloudFormation stack.
  3. Provide the CloudFormation stack parameters:
    1. For PrivateLinkAccountURL, enter the value of the parameter privatelink-account-url obtained in the prerequisites.
    2. For PrivateLinkOcspURL, enter the value of the parameter privatelink_ocsp-url obtained in the prerequisites.
    3. For PrivateLinkVpceId, enter the value of the parameter privatelink-vpce-id obtained in the prerequisites.
    4. For PrivateSubnet1CIDR, enter the IP addresses for your private subnet 1.
    5. For PrivateSubnet2CIDR, enter the IP addresses for your private subnet 2.
    6. For PrivateSubnet3CIDR, enter the IP addresses for your private subnet 3.
    7. For PublicSubnet1CIDR, enter the IP addresses for your public subnet 1.
    8. For RegionlessSnowsightPrivateLinkURL, enter the value of the parameter regionless-snowsight-privatelink-url obtained in the prerequisites.
    9. For VpcCIDR, enter the IP addresses for your VPC.
  4. Choose Next.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Submit and wait for the stack creation step to complete.

After the CloudFormation stack is successfully created, you can see all the resources created on the Resources tab.

Navigate to the Outputs tab to see the outputs provided by CloudFormation stack. Save the value of the outputs GlueSecurityGroupId, VpcId, and PrivateSubnet1Id to use in the next step in this post.

BDB-4354-cfn-output

Update the Secrets Manager secret with Snowflake credentials for the AWS Glue connection

To update the Secrets Manager secret with user snowflakeUser, password snowflakePassword, and warehouse snowflakeWarehouse that you will use in the AWS Glue connection to establish a connection to Snowflake, complete the following steps:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Open the secret blog-glue-snowflake-credentials.
  3. Under Secret value, choose Retrieve secret value.

BDB-4354-secrets-manager

  1. Choose Edit.
  2. Enter the user snowflakeUser, password snowflakePassword, and warehouse snowflakeWarehouse for the keys sfUser, sfPassword, and sfWarehouse, respectively.
  3. Choose Save.

Create the AWS Glue connection for Snowflake

An AWS Glue connection is an AWS Glue Data Catalog object that stores login credentials, URI strings, VPC information, and more for a particular data store. AWS Glue crawlers, jobs, and development endpoints use connections in order to access certain types of data stores. To create an AWS Glue connection to Snowflake, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, under Data catalog, choose Connections.
  2. Choose Create connection.
  3. For Data sources, search for and select Snowflake.
  4. Choose Next.

BDB-4354-sf-data-source

  1. For Snowflake URL, enter https://<privatelink-account-url>.

To obtain the Snowflake PrivateLink account URL, refer to parameters obtained in the prerequisites.

  1. For AWS Secret, choose the secret blog-glue-snowflake-credentials.
  2. For VPC, choose the VpcId value obtained from the CloudFormation stack output.
  3. For Subnet, choose the PrivateSubnet1Id value obtained from the CloudFormation stack output.
  4. For Security groups, choose the GlueSecurityGroupId value obtained from the CloudFormation stack output.
  5. Choose Next.

BDB-4354-sf-connection-setup

  1. In the Connection Properties section, for Name, enter glue-snowflake-connection.
  2. Choose Next.

BDB-4354-sf-connection-properties

  1. Choose Create connection.

Create an AWS Glue job

You’re now ready to define the AWS Glue job using the Snowflake connection. To create an AWS Glue job to read from Snowflake, complete the following steps:

  1. On the AWS Glue console, under ETL jobs in the navigation pane, choose Visual ETL.

BDB-4354-glue-studio

  1. Choose the Job details tab.
  2. For Name, enter a name, for example, Pricing Summary Report Job.
  3. For Description, enter a meaningful description for the job.
  4. For IAM Role, choose the role that has access to the target S3 location where the job is writing to and the source location from where it’s loading the Snowflake data and also to run the AWS Glue job. You can find this role in your CloudFormation stack output, named blog-glue-snowflake-GlueServiceRole-*.
  5. Use the default options for Type, Glue version, Language, Worker type, Number of workers, Number of retries, and Job timeout.
  6. For Job bookmark, choose Disable.
  7. Choose Save to save the job.

BDB-4354-glue-job-details

  1. On the Visual tab, choose Add nodes.

  1. For Sources, choose Snowflake.

  1. Choose Data source – Snowflake in the AWS Glue Studio canvas.
  2. For Name, enter Snowflake_Pricing_Summary.
  3. For Snowflake connection, choose glue-snowflake-connection.
  4. For Snowflake source, select Enter a custom query.
  5. For Database, enter snowflake_sample_data.
  6. For Snowflake query, add the following Snowflake query:
SELECT l_returnflag
    , l_linestatus
    , Sum(l_quantity) AS sum_qty
    , Sum(l_extendedprice) AS sum_base_price
    , Sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price
    , Sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge
    , Avg(l_quantity) AS avg_qty
    , Avg(l_extendedprice) AS avg_price
    , Avg(l_discount) AS avg_disc
    , Count(*) AS count_order
FROM tpch_sf1.lineitem
WHERE l_shipdate <= Dateadd(day, - 90, To_date('1998-12-01'))
GROUP BY l_returnflag
    , l_linestatus
ORDER BY l_returnflag
    , l_linestatus;

The Pricing Summary Report provides a summary pricing report for all line items shipped as of a given date. The date is within 60–120 days of the greatest ship date contained in the database. The query lists totals for extended price, discounted extended price, discounted extended price plus tax, average quantity, average extended price, and average discount. These aggregates are grouped by RETURNFLAG and LINESTATUS, and listed in ascending order of RETURNFLAG and LINESTATUS. A count of the number of line items in each group is included.

  1. For Custom Snowflake properties, specify Key as sfSchema and Value as tpch_sf1.
  2. Choose Save.

BDB-4354-glue-source-setup

Next, you add the destination as an S3 bucket.

  1. On the Visual tab, choose Add nodes.
  2. For Targets, choose Amazon S3.

  1. Choose Data target – S3 bucket in the AWS Glue Studio canvas.
  2. For Name, enter S3_Pricing_Summary.
  3. For Node parents, select Snowflake_Pricing_Summary.
  4. For Format, select Parquet.
  5. For S3 Target Location, enter s3://<YourBucketName>/pricing_summary_report/ (use the name of your bucket).
  6. For Data Catalog update options, select Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
  7. For Database, choose db_blog_glue_snowflake.
  8. For Table name, enter tb_pricing_summary.
  9. Choose Save.
  10. Choose Run to run the job, and monitor its status on the Runs tab.

You successfully completed the steps to create an AWS Glue job that reads data from Snowflake and loads the results into an S3 bucket using a secure connectivity pattern. Eventually, if you want to transform the data before loading it into Amazon S3, you can use AWS Glue transformations available in AWS Glue Studio. Using AWS Glue transformations is crucial when creating an AWS Glue job because they enable efficient data cleansing, enrichment, and restructuring, making sure the data is in the desired format and quality for downstream processes. Refer to Editing AWS Glue managed data transform nodes for more information.

Validate the results

After the job is complete, you can validate the output of the ETL job run in Athena, a serverless interactive analytics service. To validate the output, complete the following steps:

  1. On the Athena console, choose Launch Query Editor.
  2. For Workgroup, choose blog-workgroup.
  3. If the message “All queries run in the Workgroup, blog-workgroup, will use the following settings:” is displayed, choose Acknowledge.
  4. For Database, choose db_blog_glue_snowflake.
  5. For Query, enter the following statement:
SELECT l_returnflag
    , l_linestatus
    , sum_qty
    , sum_base_price
FROM db_blog_glue_snowflake.tb_pricing_summary
  1. Choose Run.

You have successfully validated your data for the AWS Glue job Pricing Summary Report Job.

Clean up

To clean up your resources, complete the following tasks:

  1. Delete the AWS Glue job Pricing Summary Report Job.
  2. Delete the AWS Glue connection glue-snowflake-connection.
  3. Stop any AWS Glue interactive sessions.
  4. Delete content from the S3 bucket blog-glue-snowflake-*.
  5. Delete the CloudFormation stack blog-glue-snowflake.

Conclusion

Using the native Snowflake connector in AWS Glue provides an efficient and secure way to integrate data from Snowflake into your data pipelines on AWS. By following the steps outlined in this post, you can establish a private connectivity channel between AWS Glue and your Snowflake using PrivateLink, Amazon VPC, security groups, and Secrets Manager.

This architecture allows you to read data from and write data to Snowflake tables directly from AWS Glue jobs running on Spark. The secure connectivity pattern prevents data transfers over the public internet, enhancing data privacy and security.

Combining AWS data integration services like AWS Glue with data platforms like Snowflake allows you to build scalable, secure data lakes and pipelines to power analytics, BI, data science, and ML use cases.

In summary, the native Snowflake connector and private connectivity model outlined here provide a performant, secure way to include Snowflake data in AWS big data workflows. This unlocks scalable analytics while maintaining data governance, compliance, and access control. For more information on AWS Glue, visit AWS Glue.


About the Authors

Caio Sgaraboto Montovani is a Sr. Specialist Solutions Architect, Data Lake and AI/ML within AWS Professional Services, developing scalable solutions according customer needs. His vast experience has helped customers in different industries such as life sciences and healthcare, retail, banking, and aviation build solutions in data analytics, machine learning, and generative AI. He is passionate about rock and roll and cooking, and loves to spend time with his family.

Kartikay Khator is a Solutions Architect within Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed two marathons, he is currently preparing for his next marathon challenge.

Navnit Shukla, an AWS Specialist Solution Architect specializing in Analytics, is passionate about helping clients uncover valuable insights from their data. Leveraging his expertise, he develops inventive solutions that empower businesses to make informed, data-driven decisions. Notably, Navnit is the accomplished author of the book “Data Wrangling on AWS,” showcasing his expertise in the field.

BDB-4354-awskamenKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, Amazon MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest Amazon MWAA and AWS Glue features and news!

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.

Amazon OpenSearch Serverless cost-effective search capabilities, at any scale

Post Syndicated from Satish Nandi original https://aws.amazon.com/blogs/big-data/amazon-opensearch-serverless-cost-effective-search-capabilities-at-any-scale/

We’re excited to announce the new lower entry cost for Amazon OpenSearch Serverless. With support for half (0.5) OpenSearch Compute Units (OCUs) for indexing and search workloads, the entry cost is cut in half. Amazon OpenSearch Serverless is a serverless deployment option for Amazon OpenSearch Service that you can use to run search and analytics workloads without the complexities of infrastructure management, shard tuning or data lifecycle management. OpenSearch Serverless automatically provisions and scales resources to provide consistently fast data ingestion rates and millisecond query response times during changing usage patterns and application demand. 

OpenSearch Serverless offers three types of collections to help meet your needs: Time-series, search, and vector. The new lower cost of entry benefits all collection types. Vector collections have come to the fore as a predominant workload when using OpenSearch Serverless as an Amazon Bedrock knowledge base. With the introduction of half OCUs, the cost for small vector workloads is halved. Time-series and search collections also benefit, especially for small workloads like proof-of-concept deployments and development and test environments.

A full OCU includes one vCPU, 6GB of RAM and 120GB of storage. A half OCU offers half a vCPU, 3 GB of RAM, and 60 GB of storage. OpenSearch Serverless scales up a half OCU first to one full OCU and then in one-OCU increments. Each OCU also uses Amazon Simple Storage Service (Amazon S3) as a backing store; you pay for data stored in Amazon S3 regardless of the OCU size. The number of OCUs needed for the deployment depends on the collection type, along with ingestion and search patterns. We will go over the details later in the post and contrast how the new half OCU base brings benefits. 

OpenSearch Serverless separates indexing and search computes, deploying sets of OCUs for each compute need. You can deploy OpenSearch Serverless in two forms: 1) Deployment with redundancy for production, and 2) Deployment without redundancy for development or testing.

Note: OpenSearch Serverless deploys two times the compute for both indexing and searching in redundant deployments.

OpenSearch Serverless Deployment Type

The following figure shows the architecture for OpenSearch Serverless in redundancy mode.

In redundancy mode, OpenSearch Serverless deploys two base OCUs for each compute set (indexing and search) across two Availability Zones. For small workloads under 60GB, OpenSearch Serverless uses half OCUs as the base size. The minimum deployment is four base units, two each for indexing and search. The minimum cost is approximately $350 per month (four half OCUs). All prices are quoted based on the US-East region and 30 days a month. During normal operation, all OCUs are in operation to serve traffic. OpenSearch Serverless scales up from this baseline as needed.

For non-redundant deployments, OpenSearch Serverless deploys one base OCU for each compute set, costing $174 per month (two half OCUs).

Redundant configurations are recommended for production deployments to maintain availability; if one Availability Zone goes down, the other can continue serving traffic. Non-redundant deployments are suitable for development and testing to reduce costs. In both configurations, you can set a maximum OCU limit to manage costs. The system will scale up to this limit during peak loads if necessary, but will not exceed it.

OpenSearch Serverless collections and resource allocations

OpenSearch Serverless uses compute units differently depending on the type of collection and keeps your data in Amazon S3. When you ingest data, OpenSearch Serverless writes it to the OCU disk and Amazon S3 before acknowledging the request, making sure of the data’s durability and the system’s performance. Depending on collection type, it additionally keeps data in the local storage of the OCUs, scaling to accommodate the storage and computer needs.

The time-series collection type is designed to be cost-efficient by limiting the amount of data kept in local storage, and keeping the remainder in Amazon S3. The number of OCUs needed depends on amount of data and the collection’s retention period. The number of OCUs OpenSearch Serverless uses for your workload is the larger of the default minimum OCUs, or the minimum number of OCUs needed to hold the most recent portion of your data, as defined by your OpenSearch Serverless data lifecycle policy. For example, if you ingest 1 TiB per day and have 30 day retention period, the size of the most recent data will be 1 TiB. You will need 20 OCUs [10 OCUs x 2] for indexing and another 20 OCUS [10 OCUs x 2] for search (based on the 120 GiB of storage per OCU). Access to older data in Amazon S3 raises the latency of the query responses. This tradeoff in query latency for older data is done to save on the OCUs cost.

The vector collection type uses RAM to store vector graphs, as well as disk to store indices. Vector collections keep index data in OCU local storage. When sizing for vector workloads both needs into account. OCU RAM limits are reached faster than OCU disk limits, causing vector collections to be bound by RAM space. 

OpenSearch Serverless allocates OCU resources for vector collections as follows. Considering full OCUs, it uses 2 GB for the operating system, 2 GB for the Java heap, and the remaining 2 GB for vector graphs. It uses 120 GB of local storage for OpenSearch indices. The RAM required for a vector graph depends on the vector dimensions, number of vectors stored, and the algorithm chosen. See Choose the k-NN algorithm for your billion-scale use case with OpenSearch for a review and formulas to help you pre-calculate vector RAM needs for your OpenSearch Serverless deployment.

Note: Many of the behaviors of the system are explained as of June 2024. Check back in coming months as new innovations continue to drive down cost.

Supported AWS Regions

The support for the new OCU minimums for OpenSearch Serverless is now available in all regions that support OpenSearch Serverless. See AWS Regional Services List for more information about OpenSearch Service availability. See the documentation to learn more about OpenSearch Serverless.

Conclusion

The introduction of half OCUs gives you a significant reduction in the base costs of Amazon OpenSearch Serverless. If you have a smaller data set, and limited usage, you can now take advantage of this lower cost. The cost-effective nature of this solution and simplified management of search and analytics workloads ensures seamless operation even as traffic demands vary.


About the authors 

Satish Nandi is a Senior Product Manager with Amazon OpenSearch Service. He is focused on OpenSearch Serverless and Geospatial and has years of experience in networking, security and ML and AI. He holds a BEng in Computer Science and an MBA in Entrepreneurship. In his free time, he likes to fly airplanes, hang glide, and ride his motorcycle.

Jon Handler is a Senior Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have search and log analytics workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included four years of coding a large-scale, eCommerce search engine. Jon holds a Bachelor of the Arts from the University of Pennsylvania, and a Master of Science and a Ph. D. in Computer Science and Artificial Intelligence from Northwestern University.

Unlock scalability, cost-efficiency, and faster insights with large-scale data migration to Amazon Redshift

Post Syndicated from Chanpreet Singh original https://aws.amazon.com/blogs/big-data/unlock-scalability-cost-efficiency-and-faster-insights-with-large-scale-data-migration-to-amazon-redshift/

Large-scale data warehouse migration to the cloud is a complex and challenging endeavor that many organizations undertake to modernize their data infrastructure, enhance data management capabilities, and unlock new business opportunities. As data volumes continue to grow exponentially, traditional data warehousing solutions may struggle to keep up with the increasing demands for scalability, performance, and advanced analytics.

Migrating to Amazon Redshift offers organizations the potential for improved price-performance, enhanced data processing, faster query response times, and better integration with technologies such as machine learning (ML) and artificial intelligence (AI). However, you might face significant challenges when planning for a large-scale data warehouse migration. These challenges can range from ensuring data quality and integrity during the migration process to addressing technical complexities related to data transformation, schema mapping, performance, and compatibility issues between the source and target data warehouses. Additionally, organizations must carefully consider factors such as cost implications, security and compliance requirements, change management processes, and the potential disruption to existing business operations during the migration. Effective planning, thorough risk assessment, and a well-designed migration strategy are crucial to mitigating these challenges and implementing a successful transition to the new data warehouse environment on Amazon Redshift.

In this post, we discuss best practices for assessing, planning, and implementing a large-scale data warehouse migration into Amazon Redshift.

Success criteria for large-scale migration

The following diagram illustrates a scalable migration pattern for an extract, load, and transform (ELT) scenario using Amazon Redshift data sharing patterns.

The following diagram illustrates a scalable migration pattern for extract, transform, and load (ETL) scenario.

Migration pattern extract, transform, and load (ETL) scenarios

Success criteria alignment by all stakeholders (producers, consumers, operators, auditors) is key for successful transition to a new Amazon Redshift modern data architecture. The success criteria are the key performance indicators (KPIs) for each component of the data workflow. This includes the ETL processes that capture source data, the functional refinement and creation of data products, the aggregation for business metrics, and the consumption from analytics, business intelligence (BI), and ML.

KPIs make sure you can track and audit optimal implementation, achieve consumer satisfaction and trust, and minimize disruptions during the final transition. They measure workload trends, cost usage, data flow throughput, consumer data rendering, and real-life performance. This makes sure the new data platform can meet current and future business goals.

Migration from a large-scale mission-critical monolithic legacy data warehouse (such as Oracle, Netezza, Teradata, or Greenplum) is typically planned and implemented over 6–16 months, depending on the complexity of the existing implementation. The monolithic data warehouse environments that have been built over the last 30 years contain proprietary business logic and multiple data design patterns, including an operation data store, star or Snowflake schema, dimension and facts, data warehouses and data marts, online transaction processing (OLTP) real-time dashboards, and online analytic processing (OLAP) cubes with multi-dimensional analytics. The data warehouse is highly business critical with minimal allowable downtime. If your data warehouse platform has gone through multiple enhancements over the years, your operational service levels documentation may not be current with the latest operational metrics and desired SLAs for each tenant (such as business unit, data domain, or organization group).

As part of the success criteria for operational service levels, you need to document the expected service levels for the new Amazon Redshift data warehouse environment. This includes the expected response time limits for dashboard queries or analytical queries, elapsed runtime for daily ETL jobs, desired elapsed time for data sharing with consumers, total number of tenants with concurrency of loads and reports, and mission-critical reports for executives or factory operations.

As part of your modern data architecture transition strategy, the migration goal of a new Amazon Redshift based platform is to use the scalability, performance, cost-optimization, and additional lake house capabilities of Amazon Redshift, resulting in improving the existing data consumption experience. Depending on your enterprise’s culture and goals, your migration pattern of a legacy multi-tenant data platform to Amazon Redshift could use one of the following strategies:

A majority of organizations opt for the organic strategy (lift and shift) when migrating their large data platforms to Amazon Redshift. This approach uses AWS migration tools such as the AWS Schema Conversion Tool (AWS SCT) or the managed service version DMS Schema Conversion to rapidly meet goals around data center exit, cloud adoption, reducing legacy licensing costs, and replacing legacy platforms.

By establishing clear success criteria and monitoring KPIs, you can implement a smooth migration to Amazon Redshift that meets performance and operational goals. Thoughtful planning and optimization are crucial, including optimizing your Amazon Redshift configuration and workload management, addressing concurrency needs, implementing scalability, tuning performance for large result sets, minimizing schema locking, and optimizing join strategies. This will enable right-sizing the Redshift data warehouse to meet workload demands cost-effectively. Thorough testing and performance optimization will facilitate a smooth transition with minimal disruption to end-users, fostering exceptional user experiences and satisfaction. A successful migration can be accomplished through proactive planning, continuous monitoring, and performance fine-tuning, thereby aligning with and delivering on business objectives.

Migration involves the following phases, which we delve into in the subsequent sections:

  • Assessment
    • Discovery of workload and integrations
    • Dependency analysis
    • Effort estimation
    • Team sizing
    • Strategic wave planning
  • Functional and performance
    • Code conversion
    • Data validation
  • Measure and benchmark KPIs
    • Platform-level KPIs
    • Tenant-level KPIs
    • Consumer-level KPIs
    • Sample SQL
  • Monitoring Amazon Redshift performance and continual optimization
    • Identify top offending queries
    • Optimization strategies

To achieve a successful Amazon Redshift migration, it’s important to address these infrastructure, security, and deployment considerations simultaneously, thereby implementing a smooth and secure transition.

Assessment

In this section, we discuss the steps you can take in the assessment phase.

Discovery of workload and integrations

Conducting discovery and assessment for migrating a large on-premises data warehouse to Amazon Redshift is a critical step in the migration process. This phase helps identify potential challenges, assess the complexity of the migration, and gather the necessary information to plan and implement the migration effectively. You can use the following steps:

  • Data profiling and assessment – This involves analyzing the schema, data types, table sizes, and dependencies. Special attention should be given to complex data types such as arrays, JSON, or custom data types and custom user-defined functions (UDFs), because they may require specific handling during the migration process. Additionally, it’s essential to assess the volume of data and daily incremental data to be migrated, and estimate the required storage capacity in Amazon Redshift. Furthermore, analyzing the existing workload patterns, queries, and performance characteristics provides valuable insights into the resource requirements needed to optimize the performance of the migrated data warehouse in Amazon Redshift.
  • Code and query assessment – It’s crucial to assess the compatibility of existing SQL code, including queries, stored procedures, and functions. The AWS SCT can help identify any unsupported features, syntax, or functions that need to be rewritten or replaced to achieve a seamless integration with Amazon Redshift. Additionally, it’s essential to evaluate the complexity of the existing processes and determine if they require redesigning or optimization to align with Amazon Redshift best practices.
  • Performance and scalability assessment – This includes identifying performance bottlenecks, concurrency issues, or resource constraints that may be hindering optimal performance. This analysis helps determine the need for performance tuning or workload management techniques that may be required to achieve optimal performance and scalability in the Amazon Redshift environment.
  • Application integrations and mapping – Embarking on a data warehouse migration to a new platform necessitates a comprehensive understanding of the existing technology stack and business processes intertwined with the legacy data warehouse. Consider the following:
    • Meticulously document all ETL processes, BI tools, and scheduling mechanisms employed in conjunction with the current data warehouse. This includes commercial tools, custom scripts, and any APIs or connectors interfacing with source systems.
    • Take note of any custom code, frameworks, or mechanisms utilized in the legacy data warehouse for tasks such as managing slowly changing dimensions (SCDs), generating surrogate keys, implementing business logic, and other specialized functionalities. These components may require redevelopment or adaptation to operate seamlessly on the new platform.
    • Identify all upstream and downstream applications, as well as business processes that rely on the data warehouse. Map out their specific dependencies on database objects, tables, views, and other components. Trace the flow of data from its origins in the source systems, through the data warehouse, and ultimately to its consumption by reporting, analytics, and other downstream processes.
  • Security and access control assessment – This includes reviewing the existing security model, including user roles, permissions, access controls, data retention policies, and any compliance requirements and industry regulations that need to be adhered to.

Dependency analysis

Understanding dependencies between objects is crucial for a successful migration. You can use system catalog views and custom queries on your on-premises data warehouses to create a comprehensive object dependency report. This report shows how tables, views, and stored procedures rely on each other. This also involves analyzing indirect dependencies (for example, a view built on top of another view, which in turn uses a set of tables), and having a complete understanding of data usage patterns.

Effort estimation

The discovery phase serves as your compass for estimating the migration effort. You can translate those insights into a clear roadmap as follows:

  • Object classification and complexity assessment – Based on the discovery findings, categorize objects (tables, views, stored procedures, and so on) based on their complexity. Simple tables with minimal dependencies will require less effort to migrate than intricate views or stored procedures with complex logic.
  • Migration tools – Use the AWS SCT to estimate the base migration effort per object type. The AWS SCT can automate schema conversion, data type mapping, and function conversion, reducing manual effort.
  • Additional considerations – Factor in additional tasks beyond schema conversion. This may include data cleansing, schema optimization for Amazon Redshift performance, unit testing of migrated objects, and migration script development for complex procedures. The discovery phase sheds light on potential schema complexities, allowing you to accurately estimate the effort required for these tasks.

Team sizing

With a clear picture of the effort estimate, you can now size the team for the migration.

Person-months calculation

Divide the total estimated effort by the desired project duration to determine the total person-months required. This provides a high-level understanding of the team size needed.

For example, for a ELT migration project from an on-premises data warehouse to Amazon Redshift to be completed within 6 months, we estimate the team requirements based on the number of schemas or tenants (for example, 30), number of database tables (for example, 5,000), average migration estimate for a schema (for example, 4 weeks based on complexity of stored procedures, tables and views, platform-specific routines, and materialized views), and number of business functions (for example, 2,000 segmented by simple, medium, and complex patterns). We can determine the following are needed:

  • Migration time period (65% migration/35% for validation & transition) = 0.8* 6 months = 5 months or 22 weeks
  • Dedicated teams = Number of tenants / (migration time period) / (average migration period for a tenant) = 30/5/1 = 6 teams
  • Migration team structure:
    • One to three data developers with stored procedure conversion expertise per team, performing over 25 conversions per week
    • One data validation engineer per team, testing over 50 objects per week
    • One to two data visualization experts per team, confirming consumer downstream applications are accurate and performant
  • A common shared DBA team with performance tuning expertise responding to standardization and challenges
  • A platform architecture team (3–5 individuals) focused on platform design, service levels, availability, operational standards, cost, observability, scalability, performance, and design pattern issue resolutions

Team composition expertise

Based on the skillsets required for various migration tasks, we assemble a team with the right expertise. Platform architects define a well-architected platform. Data engineers are crucial for schema conversion and data transformation, and DBAs can handle cluster configuration and workload monitoring. An engagement or project management team makes sure the project runs smoothly, on time, and within budget.

For example, for an ETL migration project from Informatica/Greenplum to a target Redshift lakehouse with an Amazon Simple Storage Service (Amazon S3) data lake to be completed within 12 months, we estimate the team requirements based on the number of schemas and tenants (for example, 50 schemas), number of database tables (for example, 10,000), average migration estimate for a schema (6 weeks based on complexity of database objects), and number of business functions (for example, 5,000 segmented by simple, medium, and complex patterns). We can determine the following are needed:

  • An open data format ingestion architecture processing the source dataset and refining the data in the S3 data lake. This requires a dedicated team of 3–7 members building a serverless data lake for all data sources. Ingestion migration implementation is segmented by tenants and type of ingestion patterns, such as internal database change data capture (CDC); data streaming, clickstream, and Internet of Things (IoT); public dataset capture; partner data transfer; and file ingestion patterns.
  • The migration team composition is tailored to the needs of a project wave. Depending on each migration wave and what is being done in the wave (development, testing, or performance tuning), the right people will be engaged. When the wave is complete, the people from that wave will move to another wave.
  • A loading team builds a producer-consumer architecture in Amazon Redshift to process concurrent near real-time publishing of data. This requires a dedicated team of 3–7 members building and publishing refined datasets in Amazon Redshift.
  • A shared DBA group of 3–5 individuals helping with schema standardization, migration challenges, and performance optimization outside the automated conversion.
  • Data transformation experts to convert database stored functions in the producer or consumer.
  • A migration sprint plan for 10 months with 2 sprint weeks with multiple waves to release tenants to the new architecture.
  • A validation team to confirm a reliable and complete migration.
  • One to two data visualization experts per team, confirming that consumer downstream applications are accurate and performant.
  • A platform architecture team (3–5 individuals) focused on platform design, service levels, availability, operational standards, cost, observability, scalability, performance, and design pattern issue resolutions.

Strategic wave planning

Migration waves can be determined as follows:

  • Dependency-based wave delineation – Objects can be grouped into migration waves based on their dependency relationships. Objects with no or minimal dependencies will be prioritized for earlier waves, whereas those with complex dependencies will be migrated in subsequent waves. This provides a smooth and sequential migration process.
  • Logical schema and business area alignment – You can further revise migration waves by considering logical schema and business areas. This allows you to migrate related data objects together, minimizing disruption to specific business functions.

Functional and performance

In this section, we discuss the steps for refactoring the legacy SQL codebase to leverage Redshift SQL best practices, build validation routines to ensure accuracy and completeness during the transition to Redshift, capturing KPIs to ensure similar or better service levels for consumption tools/downstream applications, and incorporating performance hooks and procedures for scalable and performant Redshift Platform.

Code conversion

We recommend using the AWS SCT as the first step in the code conversion journey. The AWS SCT is a powerful tool that can streamline the database schema and code migrations to Amazon Redshift. With its intuitive interface and automated conversion capabilities, the AWS SCT can significantly reduce the manual effort required during the migration process. Refer to Converting data warehouse schemas to Amazon Redshift using AWS SCT for instructions to convert your database schema, including tables, views, functions, and stored procedures, to Amazon Redshift format. For an Oracle source, you can also use the managed service version DMS Schema Conversion.

When the conversion is complete, the AWS SCT generates a detailed conversion report. This report highlights any potential issues, incompatibilities, or areas requiring manual intervention. Although the AWS SCT automates a significant portion of the conversion process, manual review and modifications are often necessary to address various complexities and optimizations.

Some common cases where manual review and modifications are typically required include:

  • Incompatible data types – The AWS SCT may not always handle custom or non-standard data types, requiring manual intervention to map them to compatible Amazon Redshift data types.
  • Database-specific SQL extensions or proprietary functions – If the source database uses SQL extensions or proprietary functions specific to the database vendor (for example, STRING_AGG() or ARRAY_UPPER functions, or custom UDFs for PostgreSQL), these may need to be manually rewritten or replaced with equivalent Amazon Redshift functions or UDFs. The AWS SCT extension pack is an add-on module that emulates functions present in a source database that are required when converting objects to the target database.
  • Performance optimization – Although the AWS SCT can convert the schema and code, manual optimization is often necessary to take advantage of the features and capabilities of Amazon Redshift. This may include adjusting distribution and sort keys, converting row-by-row operations to set-based operations, optimizing query plans, and other performance tuning techniques specific to Amazon Redshift.
  • Stored procedures and code conversion – The AWS SCT offers comprehensive capabilities to seamlessly migrate stored procedures and other code objects across platforms. Although its automated conversion process efficiently handles the majority of cases, certain intricate scenarios may necessitate manual intervention due to the complexity of the code and utilization of database-specific features or extensions. To achieve optimal compatibility and accuracy, it’s advisable to undertake testing and validation procedures during the migration process.

After you address the issues identified during the manual review process, it’s crucial to thoroughly test the converted stored procedures, as well as other database objects and code, such as views, functions, and SQL extensions, in a non-production Redshift cluster before deploying them in the production environment. This exercise is mostly undertaken by QA teams. This phase also involves conducting holistic performance testing (individual queries, batch loads, consumption reports and dashboards in BI tools, data mining applications, ML algorithms, and other relevant use cases) in addition to functional testing to make sure the converted code meets the required performance expectations. The performance tests should simulate production-like workloads and data volumes to validate the performance under realistic conditions.

Data validation

When migrating data from an on-premises data warehouse to a Redshift cluster on AWS, data validation is a crucial step to confirm the integrity and accuracy of the migrated data. There are several approaches you can consider:

  • Custom scripts – Use scripting languages like Python, SQL, or Bash to develop custom data validation scripts tailored to your specific data validation requirements. These scripts can connect to both the source and target databases, extract data, perform comparisons, and generate reports.
  • Open source tools – Use open source data validation tools like Amazon Deequ or Great Expectations. These tools provide frameworks and utilities for defining data quality rules, validating data, and generating reports.
  • AWS native or commercial tools – Use AWS native tools such as AWS Glue Data Quality or commercial data validation tools like Collibra Data Quality. These tools often provide comprehensive features, user-friendly interfaces, and dedicated support.

The following are different types of validation checks to consider:

  • Structural comparisons – Compare the list of columns and data types of columns between the source and target (Amazon Redshift). Any mismatches should be flagged.
  • Row count validation – Compare the row counts of each core table in the source data warehouse with the corresponding table in the target Redshift cluster. This is the most basic validation step to make sure no data has been lost or duplicated during the migration process.
  • Column-level validation – Validate individual columns by comparing column-level statistics (min, max, count, sum, average) for each column between the source and target databases. This can help identify any discrepancies in data values or data types.

You can also consider the following validation strategies:

  • Data profiling – Perform data profiling on the source and target databases to understand the data characteristics, identify outliers, and detect potential data quality issues. For example, you can use the data profiling capabilities of AWS Glue Data Quality or the Amazon Deequ
  • Reconciliation reports – Produce detailed validation reports that highlight errors, mismatches, and data quality issues. Consider generating reports in various formats (CSV, JSON, HTML) for straightforward consumption and integration with monitoring tools.
  • Automate the validation process – Integrate the validation logic into your data migration or ETL pipelines using scheduling tools or workflow orchestrators like Apache Airflow or AWS Step Functions.

Lastly, keep in mind the following considerations for collaboration and communication:

  • Stakeholder involvement – Involve relevant stakeholders, such as business analysts, data owners, and subject matter experts, throughout the validation process to make sure business requirements and data quality expectations are met.
  • Reporting and sign-off – Establish a clear reporting and sign-off process for the validation results, involving all relevant stakeholders and decision-makers.

Measure and benchmark KPIs

For multi-tenant Amazon Redshift implementation, KPIs are segmented at the platform level, tenant level, and consumption tools level. KPIs evaluate the operational metrics, cost metrics, and end-user response time metrics. In this section, we discuss the KPIs needed for achieving a successful transition.

Platform-level KPIs

As new tenants are gradually migrated to the platform, it’s imperative to monitor the current state of Amazon Redshift platform-level KPIs. The current KPI’s state will help the platform team make the necessary scalability modifications (add nodes, add consumer clusters, add producer clusters, or increase concurrency scaling clusters). Amazon Redshift query monitoring rules (QMR) also help govern the overall state of data platform, providing optimal performance for all tenants by managing outlier workloads.

The following table summarizes the relevant platform-level KPIs.

Component KPI Service Level and Success Criteria
ETL Ingestion data volume Daily or hourly peak volume in GBps, number of objects, number of threads.
Ingestion threads Peak hourly ingestion threads (COPY or INSERT), number of dependencies, KPI segmented by tenants and domains.
Stored procedure volume Peak hourly stored procedure invocations segmented by tenants and domains.
Concurrent load Peak concurrent load supported by the producer cluster; distribution of ingestion pattern across multiple producer clusters using data sharing.
Data sharing dependency Data sharing between producer clusters (objects refreshed, locks per hour, waits per hour).
Workload Number of queries Peak hour query volume supported by cluster segmented by short (less than 10 seconds), medium (less than 60 seconds), long (less than 5 minutes), very long (less than 30 minutes), and outlier (more than 30 minutes); segmented by tenant, domain, or sub-domain.
Number of queries per queue Peak hour query volume supported by priority automatic WLM queue segmented by short (less than 10 seconds), medium (less than 60 seconds), long (less than 5 minutes), very long (less than 30 minutes), and outlier (more than 30 minutes); segmented by tenant, business group, domain, or sub-domain.
Runtime pattern Total runtime per hour; max, median, and average run pattern; segmented by service class across clusters.
Wait time patterns Total wait time per hour; max, median, and average wait pattern for queries waiting.
Performance Leader node usage Service level for leader node (recommended less than 80%).
Compute node CPU usage Service level for compute node (recommended less than 90%).
Disk I/O usage per node Service level for disk I/O per node.
QMR rules Number of outlier queries stopped by QMR (large scan, large spilling disk, large runtime); logging thresholds for potential large queries running more than 5 minutes.
History of WLM queries Historical trend of queries stored in historical archive table for all instances of queries in STL_WLM_QUERY; trend analysis over 30 days, 60 days, and 90 days to fine-tune the workload across clusters.
Cost Total cost per month of Amazon Redshift platform Service level for mix of instances (reserved, on-demand, serverless), cost of Concurrency Scaling, cost of Amazon Redshift Spectrum usage. Use AWS tools like AWS Cost Explorer or daily cost usage report to capture monthly costs for each component.
Daily Concurrency Scaling usage Service limits to monitor cost for concurrency scaling; invoke for outlier activity on spikes.
Daily Amazon Redshift Spectrum usage Service limits to monitor cost for using Amazon Redshift Spectrum; invoke for outlier activity.
Redshift Managed Storage usage cost Track usage of Redshift Managed Storage, monitoring wastage on temporary, archival, and old data assets.
Localization Remote or on-premises tools Service level for rendering large datasets to remote destinations.
Data transfer to remote tools Data transfer to BI tools or workstations outside the Redshift cluster VPC; separation of datasets to Amazon S3 using the unload feature, avoiding bottlenecks at leader node.

Tenant-level KPIs

Tenant-level KPIs help capture current performance levels from the legacy system and document expected service levels for the data flow from the source capture to end-user consumption. The captured legacy KPIs assist in providing the best target modern Amazon Redshift platform (a single Redshift data warehouse, a lake house with Amazon Redshift Spectrum, and data sharing with the producer and consumer clusters). Cost usage tracking at the tenant level helps you spread the cost of a shared platform across tenants.

The following table summarizes the relevant tenant-level KPIs.

Component KPI Service Level and Success Criteria
Cost Compute usage by tenant Track usage by tenant, business group, or domain; capture query volume by business unit associating Redshift user identity to internal business unit; data observability by consumer usage for data products helping with cost attribution.
ETL Orchestration SLA Service level for daily data availability.
Runtime Service level for data loading and transformation.
Data ingestion volume Peak expected volume for service level guarantee.
Query consumption Response time Response time SLA for query patterns (dashboards, SQL analytics, ML analytics, BI tool caching).
Concurrency Peak query consumers for tenant.
Query volume Peak hourly volume service levels and daily query volumes.
Individual query response for critical data consumption Service level and success criteria for critical workloads.

Consumer-level KPIs

A multi-tenant modern data platform can set service levels for a variety of consumer tools. The service levels provide guidance to end-users of the capability of the new deployment.

The following table summarizes the relevant consumer-level KPIs.

Consumer KPI Service Level and Success Criteria
BI tools Large data extraction Service level for unloading data for caching or query rendering a large result dataset.
Dashboards Response time Service level for data refresh.
SQL query tools Response time Service level for response time by query type.
Concurrency Service level for concurrent query access by all consumers.
One-time analytics Response time Service level for large data unloads or aggregation.
ML analytics Response time Service level for large data unloads or aggregation.

Sample SQL

The post includes sample SQL to capture daily KPI metrics. The following example KPI dashboard trends assist in capturing historic workload patterns, identifying deviations in workload, and providing guidance on the platform workload capacity to meet the current workload and anticipated growth patterns.

The following figure shows a daily query volume snapshot (queries per day and queued queries per day, which waited a minimum of 5 seconds).

Figure shows a daily query volume snapshot (queries per day and queued queries per day, which waited a minimum of 5 seconds)

The following figure shows a daily usage KPI. It monitors percentage waits and median wait for waiting queries (identifies the minimal threshold for wait to compute waiting queries and median of all wait times to infer deviation patterns).

Figure shows a daily usage KPI. It monitors percentage waits and median wait for waiting queries (identifies the minimal threshold for wait to compute waiting queries and median of all wait times to infer deviation patterns)

The following figure illustrates concurrency usage (monitors concurrency compute usage for Concurrency Scaling clusters).

The following figure illustrates concurrency usage (monitors concurrency compute usage for Concurrency Scaling clusters)

The following figure shows a 30-day pattern (computes volume in terms of total runtime and total wait time).

The following figure shows a 30-day pattern (computes volume in terms of total runtime and total wait time)

Monitoring Redshift performance and continual optimization

Amazon Redshift uses automatic table optimization (ATO) to choose the right distribution style, sort keys, and encoding when you create a table with AUTO options. Therefore, it’s a good practice to take advantage of the AUTO feature and create tables with DISTSTYLE AUTO, SORTKEY AUTO, and ENCODING AUTO. When tables are created with AUTO options, Amazon Redshift initially creates tables with optimal keys for the best first-time query performance possible using information such as the primary key and data types. In addition, Amazon Redshift analyzes the data volume and query usage patterns to evolve the distribution strategy and sort keys to optimize performance over time. Finally, Amazon Redshift performs table maintenance activities on your tables that reduce fragmentation and make sure statistics are up to date.

During a large, phased migration, it’s important to monitor and measure Amazon Redshift performance against target KPIs at each phase and implement continual optimization. As new workloads are onboarded at each phase of the migration, it’s recommended to perform regular Redshift cluster reviews and analyze query pattern and performance. Cluster reviews can be done by engaging the Amazon Redshift specialist team through AWS Enterprise support or your AWS account team. The goal of a cluster review includes the following:

  • Use cases – Review the application use cases and determine if the design is suitable to solve for those use cases.
  • End-to-end architecture – Assess the current data pipeline architecture (ingestion, transformation, and consumption). For example, determine if too many small inserts are occurring and review their ETL pipeline. Determine if integration with other AWS services can be useful, such as AWS Lake Formation, Amazon Athena, Redshift Spectrum, or Amazon Redshift federation with PostgreSQL and MySQL.
  • Data model design – Review the data model and table design and provide recommendations for sort and distribution keys, keeping in mind best practices.
  • Performance – Review cluster performance metrics. Identify bottlenecks or irregularities and suggest recommendations. Dive deep into specific long-running queries to identify solutions specific to the customer’s workload.
  • Cost optimization – Provide recommendations to reduce costs where possible.
  • New features – Stay up to date with the new features in Amazon Redshift and identify where they can be used to meet these goals.

New workloads can introduce query patterns that could impact performance and miss target SLAs. A number of factors can affect query performance. In the following sections, we discuss aspects impacting query speed and optimizations for improving Redshift cluster performance.

Identify top offending queries

A compute node is partitioned into slices. More nodes means more processors and more slices, which enables you to redistribute the data as needed across the slices. However, more nodes also means greater expense, so you will need to find the balance of cost and performance that is appropriate for your system. For more information on Redshift cluster architecture, see Data warehouse system architecture. Each node type offers different sizes and limits to help you scale your cluster appropriately. The node size determines the storage capacity, memory, CPU, and price of each node in the cluster. For more information on node types, see Amazon Redshift pricing.

Redshift Test Drive is an open source tool that lets you evaluate which different data warehouse configuration options are best suited for your workload. We created Redshift Test Drive from Simple Replay and Amazon Redshift Node Configuration Comparison (see Compare different node types for your workload using Amazon Redshift for more details) to provide a single entry point for finding the best Amazon Redshift configuration for your workload. Redshift Test Drive also provides additional features such as a self-hosted analysis UI and the ability to replicate external objects that a Redshift workload may interact with. With Amazon Redshift Serverless, you can start with a base Redshift Processing Unit (RPU), and Redshift Serverless automatically scales based on your workload needs.

Optimization strategies

If you choose to fine-tune manually, the following are key concepts and considerations:

  • Data distribution – Amazon Redshift stores table data on the compute nodes according to a table’s distribution style. When you run a query, the query optimizer redistributes the data to the compute nodes as needed to perform any joins and aggregations. Choosing the right distribution style for a table helps minimize the impact of the redistribution step by locating the data where it needs to be before the joins are performed. For more information, see Working with data distribution styles.
  • Data sort order – Amazon Redshift stores table data on disk in sorted order according to a table’s sort keys. The query optimizer and query processor use the information about where the data is located to reduce the number of blocks that need to be scanned and thereby improve query speed. For more information, see Working with sort keys.
  • Dataset size – A higher volume of data in the cluster can slow query performance for queries, because more rows need to be scanned and redistributed. You can mitigate this effect by regular vacuuming and archiving of data, and by using a predicate (a condition in the WHERE clause) to restrict the query dataset.
  • Concurrent operations – Amazon Redshift offers a powerful feature called automatic workload management (WLM) with query priorities, which enhances query throughput and overall system performance. By intelligently managing multiple concurrent operations and allocating resources dynamically, automatic WLM makes sure high-priority queries receive the necessary resources promptly, while lower-priority queries are processed efficiently without compromising system stability. This advanced queuing mechanism allows Amazon Redshift to optimize resource utilization, minimizing potential bottlenecks and maximizing query throughput, ultimately delivering a seamless and responsive experience for users running multiple operations simultaneously.
  • Query structure – How your query is written will affect its performance. As much as possible, write queries to process and return as little data as will meet your needs. For more information, see Amazon Redshift best practices for designing queries.
  • Queries with a long return time – Queries with a long return time can impact the processing of other queries and overall performance of the cluster. It’s critical to identify and optimize them. You can optimize these queries by either moving clients to the same network or using the UNLOAD feature of Amazon Redshift, and then configure the client to read the output from Amazon S3. To identify percentile and top running queries, you can download the sample SQL notebook system queries. You can import this in Query Editor V2.0.

Conclusion

In this post, we discussed best practices for assessing, planning, and implementing a large-scale data warehouse migration into Amazon Redshift.

The assessment phase of a data migration project is critical for implementing a successful migration. It involves a comprehensive analysis of the existing workload, integrations, and dependencies to accurately estimate the effort required and determine the appropriate team size. Strategic wave planning is crucial for prioritizing and scheduling the migration tasks effectively. Establishing KPIs and benchmarking them helps measure progress and identify areas for improvement. Code conversion and data validation processes validate the integrity of the migrated data and applications. Monitoring Amazon Redshift performance, identifying and optimizing top offending queries, and conducting regular cluster reviews are essential for maintaining optimal performance and addressing any potential issues promptly.

By addressing these key aspects, organizations can seamlessly migrate their data workloads to Amazon Redshift while minimizing disruptions and maximizing the benefits of Amazon Redshift.

We hope this post provides you with valuable guidance. We welcome any thoughts or questions in the comments section.


About the authors

Chanpreet Singh is a Senior Lead Consultant at AWS, specializing in Data Analytics and AI/ML. He has over 17 years of industry experience and is passionate about helping customers build scalable data warehouses and big data solutions. In his spare time, Chanpreet loves to explore nature, read, and enjoy with his family.

Harshida Patel is a Analytics Specialist Principal Solutions Architect, with AWS.

Raza Hafeez is a Senior Product Manager at Amazon Redshift. He has over 13 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Ram Bhandarkar is a Principal Data Architect at AWS based out of Northern Virginia. He helps customers with planning future Enterprise Data Strategy and assists them with transition to Modern Data Architecture platform on AWS. He has worked with building and migrating databases, data warehouses and data lake solutions for over 25 years.

Vijay Bagur is a Sr. Technical Account Manager. He works with enterprise customers to modernize and cost optimize workloads, improve security posture, and helps them build reliable and secure applications on the AWS platform. Outside of work, he loves spending time with his family, biking and traveling.

Synchronize data lakes with CDC-based UPSERT using open table format, AWS Glue, and Amazon MSK

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/synchronize-data-lakes-with-cdc-based-upsert-using-open-table-format-aws-glue-and-amazon-msk/

In the current industry landscape, data lakes have become a cornerstone of modern data architecture, serving as repositories for vast amounts of structured and unstructured data. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in a downstream system. Capturing every change from transactions in a source database and moving them to the target keeps the systems synchronized, and helps with analytics use cases and zero-downtime database migrations.

However, efficiently managing and synchronizing data within these lakes presents a significant challenge. Maintaining data consistency and integrity across distributed data lakes is crucial for decision-making and analytics. Inaccurate or outdated data can lead to flawed insights and business decisions. Businesses require synchronized data to gain actionable insights and respond swiftly to changing market conditions. Scalability is a critical concern for data lakes, because they need to accommodate growing volumes of data without compromising performance or incurring exorbitant costs.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. We use MSK connect—an AWS managed service to deploy and run Kafka Connect to build an end-to-end CDC application that uses Debezium MySQL connector to process, insert, update, and delete records from MySQL and a confluent Amazon Simple Storage Service (Amazon S3) sink connector to write to Amazon S3 as raw data that can be consumed by other downstream application for further use cases. To process batch data effectively, we use AWS Glue, a serverless data integration service that uses the Spark framework to process the data from S3 and copies the data to the open table format layer. Open table format manages large collections of files as tables and supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. We chose Delta Lake as an example open table format, but you can achieve the same results using Apache Iceberg or Apache Hudi.

The post illustrates the construction of a comprehensive CDC system, enabling the processing of CDC data sourced from Amazon Relational Database Service (Amazon RDS) for MySQL. Initially, we’re creating a raw data lake of all modified records in the database in near real time using Amazon MSK and writing to Amazon S3 as raw data. This raw data can then be used to build a data warehouse or even a special type of data storage that’s optimized for analytics, such as a Delta Lake on S3. Later, we use an AWS Glue exchange, transform, and load (ETL) job for batch processing of CDC data from the S3 raw data lake. A key advantage of this setup is that you have complete control over the entire process, from capturing the changes in your database to transforming the data for your specific needs. This flexibility allows you to adapt the system to different use cases.

This is achieved through integration with MSK Connect using the Debezium MySQL connector, followed by writing data to Amazon S3 facilitated by the Confluent S3 Sink Connector. Subsequently, the data is processed from S3 using an AWS Glue ETL job, and then stored in the data lake layer. Finally, the Delta Lake table is queried using Amazon Athena.

Note: If you require real-time data processing of the CDC data, you can bypass the batch approach and use an AWS Glue streaming job instead. This job would directly connect to the Kafka topic in MSK, grabbing the data as soon as changes occur. It can then process and transform the data as needed, creating a Delta Lake on Amazon S3 that reflects the latest updates according to your business needs. This approach ensures you have the most up-to-date data available for real-time analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this blog post. Each number represents a major component of the solution.

The workflow consists of the following:

  1. Near real-time data capture from MySQL and streaming to Amazon S3
    1. The process starts with data originating from Amazon RDS for
    2. A Debezium connector is used to capture changes to the data in the RDS instance in near real time. Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect and immediately respond to row-level changes in the databases. Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors.
    3. The captured data changes are then streamed to an Amazon MSK topic. MSK is a managed service that simplifies running Apache Kafka on AWS.
    4. The processed data stream (topic) is streamed from MSK to Amazon S3 in JSON format. The Confluent S3 Sink Connector allows near real-time data transfer from an MSK cluster to an S3 bucket.
  2. Batch processing the CDC raw data and writing it into the data lake
    1. Set up an AWS Glue ETL job to process the raw CDC
    2. This job reads bookmarked data from an S3 raw bucket and writes into the data lake in open file format (Delta). The job also creates the Delta Lake table in AWS Glue Data Catalog.
    3. Delta Lake is an open-source storage layer built on top of existing data lakes. It adds functionalities like ACID transactions and versioning to improve data reliability and manageability.
  3. Analyze the data using serverless interactive query service
    1. Athena, a serverless interactive query service, can be used to query the Delta Lake table created in Glue Data Catalog. This allows for interactive data analysis without managing infrastructure.

For this post, we create the solution resources in the us-east-1 AWS Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of the CDC pipeline and AWS Glue processing according to your use case, and if you have requirements to create specific process resources only.

  1. vpc-msk-mskconnect-rds-client.yaml – This template sets up the CDC pipeline resources such as a virtual private cloud (VPC), subnet, security group, AWS Identity and Access Management (IAM) roles, NAT, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, Amazon MSK, MSKConnect, RDS, and S3
  2. gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database and ETL

Configure MSK and MSK connect

To start, you’ll configure MKS and MSK connect using Debezium connector to capture incremental changes in table and write into Amazon S3 using an S3 sink connector. The vpc-msk-mskconnect-rds-client.yaml stack creates a VPC, private and public subnets, security groups, S3 buckets, Amazon MSK cluster, EC2 instance with Kafka client, RDS database, and MSK connectors, and its worker configurations.

  1. Launch the stack vpc-msk-mskconnect-rds-client using the CloudFormation template:
    BDB-4100-CFN-Launch-Stack
  2. Provide the parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName An environment name that is prefixed to resource names. msk-delta-cdc-pipeline
3 DatabasePassword Database admin account password. S3cretPwd99
4 InstanceType MSK client EC2 instance type. t2.micro
5 LatestAmiId Latest AMI ID of Amazon Linux 2023 for EC2 instance. You can use the default value. /aws/service/ami-amazon-linux- latest/al2023-ami-kernel-6.1-x86_64
6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
11 PrivateSubnet3CIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. 10.192.22.0/24
  1. The stack creation process can take approximately one hour to complete. Check the Outputs tab for the stack after the stack is created.

Next, you set up the AWS Glue data processing resources such as the AWS Glue database, table, and ETL job.

Implement UPSERT on an S3 data lake with Delta Lake using AWS Glue

The gluejob-setup.yaml CloudFormation template creates a database, IAM role, and AWS Glue ETL job. Retrieve the values for S3BucketNameForOutput, and S3BucketNameForScript from the vpc-msk-mskconnect-rds-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup.
    Launch Cloudformation Stack
  2. Provide parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName Environment name that is prefixed to resource names. gluejob-setup
3 GlueDataBaseName Name of the Data Catalog database. glue_cdc_blog_db
4 GlueTableName Name of the Data Catalog table. blog_cdc_tbl
5 S3BucketForGlueScript Bucket name for the AWS Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws- gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentNam e
6 GlueWorkerType Worker type for AWS Glue job. For example, G.1X G.1X
7 NumberOfWorkers Number of workers in the AWS Glue job. 3
8 S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
9 S3ConnectorTargetBucketname Bucket name where the Amazon MSK S3 sink connector writes the data from the Kafka topic. msk-lab-${AWS::AccountId}- target-bucket
  1. The stack creation process can take approximately 2 minutes to complete. Check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created an AWS Glue database and AWS Glue job. For further clarity, you can examine the AWS Glue database and job generated using the CloudFormation template.

After successfully creating the CloudFormation stack, you can proceed with processing data using the AWS Glue ETL job.

Run the AWS Glue ETL job

To process the data created in the S3 bucket from Amazon MSK using the AWS Glue ETL job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue ETL job from the GlueJobName In the following screenshot, the name is GlueCDCJob-glue-delta-cdc.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue ETL job named GlueCDCJob-glue-delta-cdc.
  3. Choose the job name to open its details page.
  4. Choose Run to start the On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template output.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

Note: We have enabled AWS Glue job bookmark, which will make sure job will process the new data in each job run.

Query the Delta Lake table using Athena

After the AWS Glue ETL job has successfully created the Delta Lake table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blog_db created using gluejob-setup stack.
  4. To validate the data, run the following query to preview the data and find the total count.
SELECT * FROM "glue_cdc_blog_db"."blog_cdc_tbl" ORDER BY cust_id DESC LIMIT 40;
SELECT COUNT(*) FROM "glue_cdc_blog_db"."blog_cdc_tbl";

The following screenshot shows the output of our example query.

Upload incremental (CDC) data for further processing

After we process the initial full load, let’s perform insert, update, and delete records in MySQL, which will be processed by the Debezium mysql connector and written to Amazon S3 using a confluent S3 sink connector.

  1. On the Amazon EC2 console, go to the EC2 instance named KafkaClientInstance that you created using the CloudFormation template.

  1. Sign in to the EC2 instance using SSM. Select KafkaClientInstance and then choose Connect.

  1. Run the following commands to insert the data into the RDS table. Use the database password from the CloudFormation stack parameter tab.
sudo su - ec2-user
RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password
  1. Now perform the insert into the CUSTOMER table.
use salesdb;
INSERT into CUSTOMER values(8887,'Customer Name 8887','Market segment 8887');
INSERT into CUSTOMER values(8888,'Customer Name 8888','Market segment 8888');
INSERT into CUSTOMER values(8889,'Customer Name 8889','Market segment 8889');

  1. Run the AWS Glue job again to update the Delta Lake table with new records.
  2. Use the Athena console to validate the data.
  3. Perform the insert, update, and delete in the CUSTOMER table.
    UPDATE CUSTOMER SET NAME='Customer Name update 8888',MKTSEGMENT='Market segment update 8888' where CUST_ID = 8888;
    UPDATE CUSTOMER SET NAME='Customer Name update 8889',MKTSEGMENT='Market segment update 8889' where CUST_ID = 8889;
    DELETE FROM CUSTOMER where CUST_ID = 8887;
    INSERT into CUSTOMER values(9000,'Customer Name 9000','Market segment 9000');
    

  4. Run the AWS Glue job again to update the Delta Lake table with the insert, update, and delete records.
  5. Use the Athena console to validate the data to verify the update and delete records in the Delta Lake table.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client.

Conclusion

Organizations continually seek high-performance, cost-effective, and scalable analytical solutions to extract value from their operational data sources in near real time. The analytical platform must be capable of receiving updates to operational data as they happen. Traditional data lake solutions often struggle with managing changes in source data, but the Delta Lake framework addresses this challenge. This post illustrates the process of constructing an end-to-end change data capture (CDC) application using Amazon MSK, MSK Connect, AWS Glue, and native Delta Lake tables, alongside guidance on querying Delta Lake tables from Amazon Athena. This architectural pattern can be adapted to other data sources employing various Kafka connectors, enabling the creation of data lakes supporting UPSERT operations using AWS Glue and native Delta Lake tables. For further insights, see the MSK Connect examples.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specializing in AWS Glue and Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Manage Amazon Redshift provisioned clusters with Terraform

Post Syndicated from Amit Ghodke original https://aws.amazon.com/blogs/big-data/manage-amazon-redshift-provisioned-clusters-with-terraform/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it straightforward and cost-effective to analyze all your data using standard SQL and your existing extract, transform, and load (ETL); business intelligence (BI); and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

HashiCorp Terraform is an infrastructure as code (IaC) tool that lets you define cloud resources in human-readable configuration files that you can version, reuse, and share. You can then use a consistent workflow to provision and manage your infrastructure throughout its lifecycle.

In this post, we demonstrate how to use Terraform to manage common Redshift cluster operations, such as:

  • Creating a new provisioned Redshift cluster using Terraform code and adding an AWS Identity and Access Management (IAM) role to it
  • Scheduling pause, resume, and resize operations for the Redshift cluster

Solution overview

The following diagram illustrates the solution architecture for provisioning a Redshift cluster using Terraform.

Solution Overview

In addition to Amazon Redshift, the solution uses the following AWS services:

  • Amazon Elastic Compute Cloud (Amazon EC2) offers the broadest and deepest compute platform, with over 750 instances and choice of the latest processors, storage, networking, operating system (OS), and purchase model to help you best match the needs of your workload. For this post, we use an m5.xlarge instance with the Windows Server 2022 Datacenter Edition. The choice of instance type and Windows OS is flexible; you can choose a configuration that suits your use case.
  • IAM allows you to securely manage identities and access to AWS services and resources. We use IAM roles and policies to securely access services and perform relevant operations. An IAM role is an AWS identity that you can assume to gain temporary access to AWS services and resources. Each IAM role has a set of permissions defined by IAM policies. These policies determine the actions and resources the role can access.
  • AWS Secrets Manager allows you to securely store the user name and password needed to log in to Amazon Redshift.

In this post, we demonstrate how to set up an environment that connects AWS and Terraform. The following are the high-level tasks involved:

  1. Set up an EC2 instance with Windows OS in AWS.
  2. Install Terraform on the instance.
  3. Configure your environment variables (Windows OS).
  4. Define an IAM policy to have minimum access to perform activities on a Redshift cluster, including pause, resume, and resize.
  5. Establish an IAM role using the policy you created.
  6. Create a provisioned Redshift cluster using Terraform code.
  7. Attach the IAM role you created to the Redshift cluster.
  8. Write the Terraform code to schedule cluster operations like pause, resume, and resize.

Prerequisites

To complete the activities described in this post, you need an AWS account and administrator privileges on the account to use the key AWS services and create the necessary IAM roles.

Create an EC2 instance

We begin with creating an EC2 instance. Complete the following steps to create a Windows OS EC2 instance:

  1. On the Amazon EC2 console, choose Launch Instance.
  2. Choose a Windows Server Amazon Machine Image (AMI) that suits your requirements.
  3. Select an appropriate instance type for your use case.
  4. Configure the instance details:
    1. Choose the VPC and subnet where you want to launch the instance.
    2. Enable Auto-assign Public IP.
    3. For Add storage, configure the desired storage options for your instance.
    4. Add any necessary tags to the instance.
  5. For Configure security group, select or create a security group that allows the necessary inbound and outbound traffic to your instance.
  6. Review the instance configuration and choose Launch to start the instance creation process.
  7. For Select an existing key pair or create a new key pair, choose an existing key pair or create a new one.
  8. Choose Launch instance.
  9. When the instance is running, you can connect to it using the Remote Desktop Protocol (RDP) and the administrator password obtained from the Get Windows password

Install Terraform on the EC2 instance

Install Terraform on the Windows EC2 instance using the following steps:

  1. RDP into the EC2 instance you created.
  2. Install Terraform on the EC2 instance.

You need to update the environment variables to point to the directory where the Terraform executable is available.

  1. Under System Properties, on the Advanced tab, choose Environment Variables.

Environment Variables

  1. Choose the path variable.

Path Variables

  1. Choose New and enter the path where Terraform is installed. For this post, it’s in the C:\ directory.

Add Terraform to path variable

  1. Confirm Terraform is installed by entering the following command:

terraform -v

Check Terraform version

Optionally, you can use an editor like Visual Studio Code (VS Code) and add the Terraform extension to it.

Create a user for accessing AWS through code (AWS CLI and Terraform)

Next, we create an administrator user in IAM, which performs the operations on AWS through Terraform and the AWS Command Line Interface (AWS CLI). Complete the following steps:

  1. Create a new IAM user.
  2. On the IAM console, download and save the access key and user key.

Create New IAM User

  1. Install the AWS CLI.
  2. Launch the AWS CLI and run aws configure and pass the access key ID, secret access key, and default AWS Region.

This prevents the AWS user name and password from being visible in plain text in the Terraform code and prevents accidental sharing when the code is committed to a code repository.

AWS Configure

Create a user for Accessing Redshift through code (Terraform)

Because we’re creating a Redshift cluster and subsequent operations, the administrator user name and password required for these processes (different than the admin role we created earlier for logging in to the AWS Management Console) needs to be invoked in the code. To do this securely, we use Secrets Manager to store the user name and password. We write code in Terraform to access these credentials during the cluster create operation. Complete the following steps:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.

Store a New Secret

  1. For Secret type, select Credentials for Amazon Redshift data warehouse.
  2. Enter your credentials.

Choose Secret Type

Set up Terraform

Complete the following steps to set up Terraform:

  1. Create a folder or directory for storing all your Terraform code.
  2. Open the VS Code editor and browse to your folder.
  3. Choose New File and enter a name for the file using the .tf extension

Now we’re ready to start writing our code starting with defining providers. The providers definition is a way for Terraform to get the necessary APIs to interact with AWS.

  1. Configure a provider for Terraform:
terraform {
required_providers {
aws = {
source  = "hashicorp/aws"
version = "5.53.0"
}
}
}

# Configure the AWS Provider
provider "aws" {
region = "us-east-1"
}
  1. Access the admin credentials for the Amazon Redshift admin user:
data "aws_secretsmanager_secret_version" "creds" {
# Fill in the name you gave to your secret
secret_id = "terraform-creds"
}
/*json decode to parse the secret*/
locals {
terraform-creds = jsondecode(
data.aws_secretsmanager_secret_version.creds.secret_string
)
}

Create a Redshift cluster

To create a Redshift cluster, use the aws_redshift_cluster resource:

# Create an encrypted Amazon Redshift cluster

resource "aws_redshift_cluster" "dw_cluster" {
cluster_identifier = "tf-example-redshift-cluster"
database_name      = "dev"
master_username    = local.terraform-creds.username
master_password    = local.terraform-creds.password
node_type          = "ra3.xlplus"
cluster_type       = "multi-node"
publicly_accessible = "false"
number_of_nodes    = 2
encrypted         = true
kms_key_id        = local.RedshiftClusterEncryptionKeySecret.arn
enhanced_vpc_routing = true
cluster_subnet_group_name="<<your-cluster-subnet-groupname>>"
}

In this example, we create a Redshift cluster called tf-example-redshift-cluster, using the ra3.xlplus node type 2 node cluster. We use the credentials from Secrets Manager and jsondecode to access these values. This makes sure the user name and password aren’t passed in plain text.

Add an IAM role to the cluster

Because we didn’t have the option to associate an IAM role during cluster creation, we do so now with the following code:

resource "aws_redshift_cluster_iam_roles" "cluster_iam_role" {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
iam_role_arns      = ["arn:aws:iam::yourawsaccountId:role/service-role/yourIAMrolename"]
}

Enable Redshift cluster operations

Performing operations on the Redshift cluster such as resize, pause, and resume on a schedule offers a more practical use of these operations. Therefore, we create two policies: one that allows the Amazon Redshift scheduler service and one that allows the cluster pause, resume, and resize operations. Then we create a role that has both policies attached to it.

You can perform these steps directly from the console and then referenced in Terraform code. The following example demonstrates the code snippets to create policies and a role, and then to attach the policy to the role.

  1. Create the Amazon Redshift scheduler policy document and create the role that assumes this policy:
#define policy document to establish the Trust Relationship between the role and the entity (Redshift scheduler)

data "aws_iam_policy_document" "assume_role_scheduling" {
statement {
effect = "Allow"
principals {
type        = "Service"
identifiers = ["scheduler.redshift.amazonaws.com"]
}
actions = ["sts:AssumeRole"]
}
}

#create a role that has the above trust relationship attached to it, so that it can invoke the redshift scheduling service
resource "aws_iam_role" "scheduling_role" {
name               = "redshift_scheduled_action_role"
assume_role_policy = data.aws_iam_policy_document.assume_role_scheduling.json
}
  1. Create a policy document and policy for Amazon Redshift operations:
/*define the policy document for other redshift operations*/

data "aws_iam_policy_document" "redshift_operations_policy_definition" {
statement {
effect = "Allow"
actions = [
"redshift:PauseCluster",
"redshift:ResumeCluster",
"redshift:ResizeCluster",
]
resources = ["arn:aws:redshift:*:youraccountid:cluster:*"]
}
}

/*create the policy and add the above data (json) to the policy*/
resource "aws_iam_policy" "scheduling_actions_policy" {
name   = "redshift_scheduled_action_policy"
policy = data.aws_iam_policy_document.redshift_operations_policy_definition.json
}
  1. Attach the policy to the IAM role:
/*connect the policy and the role*/
resource "aws_iam_role_policy_attachment" "role_policy_attach" {
policy_arn = aws_iam_policy.scheduling_actions_policy.arn
role       = aws_iam_role.scheduling_role.name
}
  1. Pause the Redshift cluster:
#pause a cluster
resource "aws_redshift_scheduled_action" "pause_operation" {
name     = "tf-redshift-scheduled-action-pause"
schedule = "cron(00 22 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
pause_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-pause that pauses the cluster at 10:00 PM every day as a cost-saving action.

  1. Resume the Redshift cluster:
name     = "tf-redshift-scheduled-action-resume"
schedule = "cron(15 07 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
resume_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-resume that resumes the cluster at 7:15 AM every day in time for business operations to start using the Redshift cluster.

  1. Resize the Redshift cluster:
#resize a cluster
resource "aws_redshift_scheduled_action" "resize_operation" {
name     = "tf-redshift-scheduled-action-resize"
schedule = "cron(15 14 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
resize_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
cluster_type = "multi-node"
node_type = "ra3.xlplus"
number_of_nodes = 4 /*increase the number of nodes using resize operation*/
classic = true /*default behavior is to use elastic resizeboolean value if we want to use classic resize*/
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-resize that increases the nodes from 2 to 4. You can do other operations like change the node type as well. By default, elastic resize will be used, but if you want to use classic resize, you have to pass the parameter classic = true as shown in the preceding code. This can be a scheduled action to anticipate the needs of peak periods and resize appripriately for that duration. You can then downsize using similar code during non-peak times.

Test the solution

We apply the following code to test the solution. Change the resource details accordingly, such as account ID and Region name.

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "5.53.0"
    }
  }
}

# Configure the AWS Provider
provider "aws" {
  region = "us-east-1"
}

# access secrets stored in secret manager
data "aws_secretsmanager_secret_version" "creds" {
  # Fill in the name you gave to your secret
  secret_id = "terraform-creds"
}

/*json decode to parse the secret*/
locals {
  terraform-creds = jsondecode(
    data.aws_secretsmanager_secret_version.creds.secret_string
  )
}

#Store the arn of the KMS key to be used for encrypting the redshift cluster

data "aws_secretsmanager_secret_version" "encryptioncreds" {
  secret_id = "RedshiftClusterEncryptionKeySecret"
}
locals {
  RedshiftClusterEncryptionKeySecret = jsondecode(
    data.aws_secretsmanager_secret_version.encryptioncreds.secret_string
  )
}

# Create an encrypted Amazon Redshift cluster
resource "aws_redshift_cluster" "dw_cluster" {
  cluster_identifier = "tf-example-redshift-cluster"
  database_name      = "dev"
  master_username    = local.terraform-creds.username
  master_password    = local.terraform-creds.password
  node_type          = "ra3.xlplus"
  cluster_type       = "multi-node"
  publicly_accessible = "false"
  number_of_nodes    = 2
  encrypted         = true
  kms_key_id        = local.RedshiftClusterEncryptionKeySecret.arn
  enhanced_vpc_routing = true
  cluster_subnet_group_name="redshiftclustersubnetgroup-yuu4sywme0bk"
}

#add IAM Role to the Redshift cluster

resource "aws_redshift_cluster_iam_roles" "cluster_iam_role" {
  cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
  iam_role_arns      = ["arn:aws:iam::youraccountid:role/service-role/yourrolename"]
}

#for audit logging please create an S3 bucket which has read write privileges for Redshift service, this example does not include S3 bucket creation.

resource "aws_redshift_logging" "redshiftauditlogging" {
  cluster_identifier   = aws_redshift_cluster.dw_cluster.cluster_identifier
  log_destination_type = "s3"
  bucket_name          = "your-s3-bucket-name"
}

#to do operations like pause, resume, resize on a schedule we need to first create a role that has permissions to perform these operations on the cluster

#define policy document to establish the Trust Relationship between the role and the entity (Redshift scheduler)

data "aws_iam_policy_document" "assume_role_scheduling" {
  statement {
    effect = "Allow"
    principals {
      type        = "Service"
      identifiers = ["scheduler.redshift.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

#create a role that has the above trust relationship attached to it, so that it can invoke the redshift scheduling service
resource "aws_iam_role" "scheduling_role" {
  name               = "redshift_scheduled_action_role"
  assume_role_policy = data.aws_iam_policy_document.assume_role_scheduling.json
}

/*define the policy document for other redshift operations*/

data "aws_iam_policy_document" "redshift_operations_policy_definition" {
  statement {
    effect = "Allow"
    actions = [
      "redshift:PauseCluster",
      "redshift:ResumeCluster",
      "redshift:ResizeCluster",
    ]

    resources =  ["arn:aws:redshift:*:youraccountid:cluster:*"]
  }
}

/*create the policy and add the above data (json) to the policy*/

resource "aws_iam_policy" "scheduling_actions_policy" {
  name   = "redshift_scheduled_action_policy"
  policy = data.aws_iam_policy_document.redshift_operations_policy_definition.json
}

/*connect the policy and the role*/

resource "aws_iam_role_policy_attachment" "role_policy_attach" {
  policy_arn = aws_iam_policy.scheduling_actions_policy.arn
  role       = aws_iam_role.scheduling_role.name
}

#pause a cluster

resource "aws_redshift_scheduled_action" "pause_operation" {
  name     = "tf-redshift-scheduled-action-pause"
  schedule = "cron(00 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    pause_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
    }
  }
}

#resume a cluster

resource "aws_redshift_scheduled_action" "resume_operation" {
  name     = "tf-redshift-scheduled-action-resume"
  schedule = "cron(15 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    resume_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
    }
  }
}

#resize a cluster

resource "aws_redshift_scheduled_action" "resize_operation" {
  name     = "tf-redshift-scheduled-action-resize"
  schedule = "cron(15 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    resize_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
      cluster_type = "multi-node"
      node_type = "ra3.xlplus"
      number_of_nodes = 4 /*increase the number of nodes using resize operation*/
      classic = true /*default behavior is to use elastic resizeboolean value if we want to use classic resize*/
    }
  }
}

Run terraform plan to see a list of changes that will be made, as shown in the following screenshot.

Terraform plan

After you have reviewed the changes, use terraform apply to create the resources you defined.

Terraform Apply

You will be asked to enter yes or no before Terraform starts creating the resources.

Confirmation of apply

You can confirm that the cluster is being created on the Amazon Redshift console.

redshift cluster creation

After the cluster is created, the IAM roles and schedules for pause, resume, and resize operations are added, as shown in the following screenshot.

Terraform actions

You can also view these scheduled operations on the Amazon Redshift console.

Scheduled Actions

Clean up

If you deployed resources such as the Redshift cluster and IAM roles, or any of the other associated resources by running terraform apply, to avoid incurring charges on your AWS account, run terraform destroy to tear these resources down and clean up your environment.

Conclusion

Terraform offers a powerful and flexible solution for managing your infrastructure as code using a declarative approach, with a cloud-agnostic nature, resource orchestration capabilities, and strong community support. This post provided a comprehensive guide to using Terraform to deploy a Redshift cluster and perform important operations such as resize, resume, and pause on the cluster. Embracing IaC and using the right tools, such as Workflow Studio, VS Code, and Terraform, will enable you to build scalable and maintainable distributed applications, and automate processes.


About the Authors

Amit Ghodke is an Analytics Specialist Solutions Architect based out of Austin. He has worked with databases, data warehouses and analytical applications for the past 16 years. He loves to help customers implement analytical solutions at scale to derive maximum business value.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Amazon Managed Service for Apache Flink now supports Apache Flink version 1.19

Post Syndicated from Francisco Morillo original https://aws.amazon.com/blogs/big-data/amazon-managed-service-for-apache-flink-now-supports-apache-flink-version-1-19/

Apache Flink is an open source distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink offers a fully managed, serverless experience in running Apache Flink applications and now supports Apache Flink 1.19.1, the latest stable version of Apache Flink at the time of writing. AWS led the community release of the version 1.19.1, which introduces a number of bug fixes over version 1.19.0, released in March 2024.

In this post, we discuss some of the interesting new features and configuration changes available for Managed Service for Apache Flink introduced with this new release. In every Apache Flink release, there are exciting new experimental features. However, in this post, we are going to focus on the features most accessible to the user with this release.

Connectors

With the release of version 1.19.1, the Apache Flink community also released new connector versions for the 1.19 runtime. Starting from 1.16, Apache Flink introduced a new connector version numbering, following the pattern <connector-version>-<flink-version>. It’s recommended to use connectors for the runtime version you are using. Refer to Using Apache Flink connectors to stay updated on any future changes regarding connector versions and compatibility.

SQL

Apache Flink 1.19 brings new features and improvements, particularly in the SQL API. These enhancements are designed to provide more flexibility, better performance, and ease of use for developers working with Flink’s SQL API. In this section, we delve into some of the most notable SQL enhancements introduced in this release.

State TTL per operator

Configuring state TTL at the operator level was introduced in Apache Flink 1.18 but wasn’t easily accessible to the end-user. To modify an operator TTL, you had to export the plan at development time, modify it manually, and force Apache Flink to use the edited plan instead of generating a new one when the application starts. The new features added to Flink SQL in 1.19 simplify this process by allowing TTL configurations directly through SQL hints, eliminating the need for JSON plan manipulation.

The following code shows examples of how to use SQL hints to set state TTL:

-- State TTL for Joins
SELECT /*+ STATE_TTL('Orders' = '1d', 'Customers' = '20d') */ 
  *
FROM Orders 
LEFT OUTER JOIN Customers 
  ON Orders.o_custkey = Customers.c_custkey;

-- State TTL for Aggregations
SELECT /*+ STATE_TTL('o' = '1d') */ 
  o_orderkey, SUM(o_totalprice) AS revenue 
FROM Orders AS o 
GROUP BY o_orderkey;

Session window table-valued functions

Windows are at the heart of processing infinite streams in Apache Flink, splitting the stream into finite buckets for computations. Before 1.19, Apache Flink provided the following types of window table-value functions (TVFs):

  • Tumble windows – Fixed-size, non-overlapping windows
  • Hop windows – Fixed-size, overlapping windows with a specified hop interval
  • Cumulate windows – Increasingly larger windows that start at the same point but grow over time

With the Apache Flink 1.19 release, it has enhanced its SQL capabilities by supporting session window TVFs in streaming mode, allowing for more sophisticated and flexible windowing operations directly within SQL queries. Applications can create dynamic windows that group elements based on session gaps, now supported in streaming mode. The following code shows an example:

-- Session window with partition keys
SELECT 
  * 
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));

-- Apply aggregation on the session windowed table with partition keys
SELECT 
  window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;

Mini-batch optimization for regular joins

When using the Table API or SQL, regular joins—standard equi-joins like a table SQL join, where time is not a factor—may induce a considerable overhead for the state backend, especially when using RocksDB.

Normally, Apache Flink processes standard joins one record at a time, looking up the state for a matching record in the other side of the join, updating the state with the input record, and emitting the resulting record. This may add considerable pressure on RocksDB, with multiple reads and writes for each record.

Apache Flink 1.19 introduces the ability to use mini-batch processing with equi-joins (FLIP-415). When enabled, Apache Flink will process regular joins not one record at a time, but in small batches, substantially reducing the pressure on the RocksDB state backend. Mini-batching adds some latency, which is controllable by the user. See, for example, the following SQL code (embedded in Java):

TableConfig tableConfig = tableEnv.getConfig();
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5s");
tableConfig.set("table.exec.mini-batch.size", "5000");

tableEnv.executeSql("CREATE TEMPORARY VIEW ab AS " +
  "SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content " +
  "FROM a LEFT JOIN b ON a.id = b.id";

With this configuration, Apache Flink will buffer up to 5,000 records or up to 5 seconds, whichever comes first, before processing the join for the entire mini-batch.

In Apache Flink 1.19, mini-batching only works for regular joins, not windowed or temporal joins. Mini-batching is disabled by default, and you have to explicitly enable it and set the batch size and latency for Flink to use it. Also, mini-batch settings are global, applied to all regular join of your application. At the time of writing, it’s not possible to set mini-batching per join statement.

AsyncScalarFunction

Before version 1.19, an important limitation of SQL and the Table API, compared to the Java DataStream API, was the lack of asynchronous I/O support. Any request to an external system, for example a database or a REST API, or even any AWS API call, using the AWS SDK, is synchronous and blocking. An Apache Flink’s subtask waits for the response before completing the processing of a record and proceeding to the next one. Practically, the roundtrip latency of each request was added to the processing latency for each processed record. Apache Flink’s Async I/O API removes this limitation, but it’s only available for the DataStream API and Java. Until version 1.19, there was no simple efficient workaround in SQL, the Table API, or Python.

Apache Flink 1.19 introduces the new AsyncScalarFunction, a user-defined function (UDF) that can be implemented using non-blocking calls to the external system, to support use cases similar to asynchronous I/O in SQL and the Table API.

This new type of UDF is only available in streaming mode. At the moment, it only supports ordered output. DataStream Async I/O also supports unordered output, which may further reduce latency when strict ordering isn’t required.

Python 3.11 support

Python 3.11 is now supported, and Python 3.7 support has been completely removed (FLINK-33029). Managed Service for Apache Flink currently uses the Python 3.11 runtime to run PyFlink applications. Python 3.11 is a bugfix only version of the runtime. Python 3.11 introduced several performance improvements and bug fixes, but no API breaking changes.

Performance improvements: Dynamic checkpoint interval

In the latest release of Apache Flink 1.19, significant enhancements have been made to improve checkpoint behavior. With this new release, it gives the application the capability to adjust checkpointing intervals dynamically based on whether the source is processing backlog data (FLIP-309).

In Apache Flink 1.19, you can now specify different checkpointing intervals based on whether a source operator is processing backlog data. This flexibility optimizes job performance by reducing checkpoint frequency during backlog phases, enhancing overall throughput. Extending checkpoint intervals allows Apache Flink to prioritize processing throughput over frequent state snapshots, thereby improving efficiency and performance.

To enable it, you need to define the execution.checkpointing.interval parameter for regular intervals and execution.checkpointing.interval-during-backlog to specify a longer interval when sources report processing backlog.

For example, if you want to run checkpoints every 60 seconds during normal processing, but extend to 10 minutes during the processing of backlogs, you can set the following:

  • execution.checkpointing.interval = 60s
  • execution.checkpointing.interval-during-backlog = 10m

In Amazon Managed Service for Apache Flink, the default checkpointing interval is configured by the application configuration (60 seconds by default). You don’t need to set the configuration parameter. To set a longer checkpointing interval during backlog processing, you can raise a support case to modify execution.checkpointing.interval-during-backlog. See Modifiable Flink configuration properties for further details about modifying Apache Flink configurations.

At the time of writing, dynamic checkpointing intervals are only supported by Apache Kafka source and FileSystem source connectors. If you use any other source connector, intervals during backlog are ignored, and Apache Flink runs a checkpoint at the default interval during backlog processing.

In Apache Flink, checkpoints are always injected in the flow from the sources. This feature only involves source connectors. The sink connectors you use in your application don’t affect this feature. For a deep dive into the Apache Flink checkpoint mechanism, see Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints.

More troubleshooting information: Job initialization and checkpoint traces

With FLIP-384, Apache Flink 1.19 introduces trace reporters, which show checkpointing and job initialization traces. As of 1.19, this trace information can be sent to the logs using Slf4j. In Managed Service for Apache Flink, this is now enabled by default. You can find checkpoint and job initialization details in Amazon CloudWatch Logs, with the other logs from the application.

Checkpoint traces contain valuable information about each checkpoint. You can find similar information on the Apache Flink Dashboard, but only for the latest checkpoints and only while the application is running. Conversely, in the logs, you can find the full history of checkpoints. The following is an example of a checkpoint trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker, 
  name=Checkpoint, 
  startTsMillis=1718779769305, 
  endTsMillis=1718779769542, 
  attributes={
    jobId=1b418a2404cbcf47ef89071f83f2dff9, 
    checkpointId=9774, 
    checkpointStatus=COMPLETED, 
    fullSize=9585, 
    checkpointedSize=9585
  }
}

Job initialization traces are generated when the job starts and recovers the state from a checkpoint or savepoint. You can find valuable statistics you can’t normally find elsewhere, including the Apache Flink Dashboard. The following is an example of a job initialization trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker,
  name=JobInitialization,
  startTsMillis=1718781201463,
  endTsMillis=1718781409657,
  attributes={
    maxReadOutputDataDurationMs=89,
    initializationStatus=COMPLETED,
    fullSize=26167879378,
    sumMailboxStartDurationMs=621,
    sumGateRestoreDurationMs=29,
    sumDownloadStateDurationMs=199482,
    sumRestoredStateSizeBytes.LOCAL_MEMORY=46764,
    checkpointId=270,
    sumRestoredStateSizeBytes.REMOTE=26167832614,
    maxDownloadStateDurationMs=199482,
    sumReadOutputDataDurationMs=90,
    maxRestoredStateSizeBytes.REMOTE=26167832614,
    maxInitializeStateDurationMs=201122,
    sumInitializeStateDurationMs=201241,
    jobId=8edb291c9f1c91c088db51b48de42308,
    maxGateRestoreDurationMs=22,
    maxMailboxStartDurationMs=391,
    maxRestoredStateSizeBytes.LOCAL_MEMORY=46764
  }
}

Checkpoint and job initialization traces are logged at INFO level. You can find them in CloudWatch Logs only if you configure a logging level of INFO or DEBUG in your Managed Service for Apache Flink application.

Managed Service for Apache Flink behavior change

As a fully managed service, Managed Service for Apache Flink controls some runtime configuration parameters to guarantee the stability of your application. For details about the Apache Flink settings that can be modified, see Apache Flink settings.

With the 1.19 runtime, if you programmatically modify a configuration parameter that is directly controlled by Managed Service for Apache Flink, you receive an explicit ProgramInvocationException when the application starts, explaining what parameter is causing the problem and preventing the application from starting. With runtime 1.18 or earlier, changes to parameters controlled by the managed service were silently ignored.

To learn more about how Managed Service for Apache Flink handles configuration changes in runtime 1.19 or later, refer to FlinkRuntimeException: “Not allowed configuration change(s) were detected”.

Conclusion

In this post, we explored some of the new relevant features and configuration changes introduced with Apache Flink 1.19, now supported by Managed Service for Apache Flink. This latest version brings numerous enhancements aimed at improving performance, flexibility, and usability for developers working with Apache Flink.

With the support of Apache Flink 1.19, Managed Service for Apache Flink now supports the latest released Apache Flink version. We have seen some of the interesting new features available for Flink SQL and PyFlink.

You can find more details about recent releases from the Apache Flink blog and release notes:

If you’re new to Apache Flink, we recommend our guide to choosing the right API and language and following the getting started guide to start using Managed Service for Apache Flink.

If you’re already running an application in Managed Service for Apache Flink, you can safely upgrade it in-place to the new 1.19 runtime.


About the Authors

Francisco Morillo is a Streaming Solutions Architect at AWS, specializing in real-time analytics architectures. With over five years in the streaming data space, Francisco has worked as a data analyst for startups and as a big data engineer for consultancies, building streaming data pipelines. He has deep expertise in Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. Francisco collaborates closely with AWS customers to build scalable streaming data solutions and advanced streaming data lakes, ensuring seamless data processing and real-time insights.

Lorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Automate data loading from your database into Amazon Redshift using AWS Database Migration Service (DMS), AWS Step Functions, and the Redshift Data API

Post Syndicated from Ritesh Sinha original https://aws.amazon.com/blogs/big-data/automate-data-loading-from-your-database-into-amazon-redshift-using-aws-database-migration-service-dms-aws-step-functions-and-the-redshift-data-api/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

As more and more data is being generated, collected, processed, and stored in many different systems, making the data available for end-users at the right place and right time is a very important aspect for data warehouse implementation. A fully automated and highly scalable ETL process helps minimize the operational effort that you must invest in managing the regular ETL pipelines. It also provides timely refreshes of data in your data warehouse.

You can approach the data integration process in two ways:

  • Full load – This method involves completely reloading all the data within a specific data warehouse table or dataset
  • Incremental load – This method focuses on updating or adding only the changed or new data to the existing dataset in a data warehouse

This post discusses how to automate ingestion of source data that changes completely and has no way to track the changes. This is useful for customers who want to use this data in Amazon Redshift; some examples of such data are products and bills of materials without tracking details at the source.

We show how to build an automatic extract and load process from various relational database systems into a data warehouse for full load only. A full load is performed from SQL Server to Amazon Redshift using AWS Database Migration Service (AWS DMS). When Amazon EventBridge receives a full load completion notification from AWS DMS, ETL processes are run on Amazon Redshift to process data. AWS Step Functions is used to orchestrate this ETL pipeline. Alternatively, you could use Amazon Managed Workflows for Apache Airflow (Amazon MWAA), a managed orchestration service for Apache Airflow that makes it straightforward to set up and operate end-to-end data pipelines in the cloud.

Solution overview

The workflow consists of the following steps:

  1. The solution uses an AWS DMS migration task that replicates the full load dataset from the configured SQL Server source to a target Redshift cluster in a staging area.
  2. AWS DMS publishes the replicationtaskstopped event to EventBridge when the replication task is complete, which invokes an EventBridge rule.
  3. EventBridge routes the event to a Step Functions state machine.
  4. The state machine calls a Redshift stored procedure through the Redshift Data API, which loads the dataset from the staging area to the target production tables. With this API, you can also access Redshift data with web-based service applications, including AWS Lambda.

The following architecture diagram highlights the end-to-end solution using AWS services.

In the following sections, we demonstrate how to create the full load AWS DMS task, configure the ETL orchestration on Amazon Redshift, create the EventBridge rule, and test the solution.

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  • An AWS account
  • A SQL Server database configured as a replication source for AWS DMS
  • A Redshift cluster to serve as the target database
  • An AWS DMS replication instance to migrate data from source to target
  • A source endpoint pointing to the SQL Server database
  • A target endpoint pointing to the Redshift cluster

Create the full load AWS DMS task

Complete the following steps to set up your migration task:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. For Task identifier, enter a name for your task, such as dms-full-dump-task.
  4. Choose your replication instance.
  5. Choose your source endpoint.
  6. Choose your target endpoint.
  7. For Migration type, choose Migrate existing data.

  1. In the Table mapping section, under Selection rules, choose Add new selection rule
  2. For Schema, choose Enter a schema.
  3. For Schema name, enter a name (for example, dms_sample).
  4. Keep the remaining settings as default and choose Create task.

The following screenshot shows your completed task on the AWS DMS console.

Create Redshift tables

Create the following tables on the Redshift cluster using the Redshift query editor:

  • dbo.dim_cust – Stores customer attributes:
CREATE TABLE dbo.dim_cust (
cust_key integer ENCODE az64,
cust_id character varying(10) ENCODE lzo,
cust_name character varying(100) ENCODE lzo,
cust_city character varying(50) ENCODE lzo,
cust_rev_flg character varying(1) ENCODE lzo
)

DISTSTYLE AUTO;
  • dbo.fact_sales – Stores customer sales transactions:
CREATE TABLE dbo.fact_sales (
order_number character varying(20) ENCODE lzo,
cust_key integer ENCODE az64,
order_amt numeric(18,2) ENCODE az64
)

DISTSTYLE AUTO;
  • dbo.fact_sales_stg – Stores daily customer incremental sales transactions:
CREATE TABLE dbo.fact_sales_stg (
order_number character varying(20) ENCODE lzo,
cust_id character varying(10) ENCODE lzo,
order_amt numeric(18,2) ENCODE az64
)

DISTSTYLE AUTO;

Use the following INSERT statements to load sample data into the sales staging table:

insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (100,1,200);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (101,1,300);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (102,2,25);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (103,2,35);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (104,3,80);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (105,3,45);

Create the stored procedures

In the Redshift query editor, create the following stored procedures to process customer and sales transaction data:

  • Sp_load_cust_dim() – This procedure compares the customer dimension with incremental customer data in staging and populates the customer dimension:
CREATE OR REPLACE PROCEDURE dbo.sp_load_cust_dim()
LANGUAGE plpgsql
AS $$
BEGIN
truncate table dbo.dim_cust;
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (1,100,'abc','chicago');
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (2,101,'xyz','dallas');
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (3,102,'yrt','new york');
update dbo.dim_cust
set cust_rev_flg=case when cust_city='new york' then 'Y' else 'N' end
where cust_rev_flg is null;
END;
$$
  • sp_load_fact_sales() – This procedure does the transformation for incremental order data by joining with the date dimension and customer dimension and populates the primary keys from the respective dimension tables in the final sales fact table:
CREATE OR REPLACE PROCEDURE dbo.sp_load_fact_sales()
LANGUAGE plpgsql
AS $$
BEGIN
--Process Fact Sales
insert into dbo.fact_sales
select
sales_fct.order_number,
cust.cust_key as cust_key,
sales_fct.order_amt
from dbo.fact_sales_stg sales_fct
--join to customer dim
inner join (select * from dbo.dim_cust) cust on sales_fct.cust_id=cust.cust_id;
END;
$$

Create the Step Functions state machine

Complete the following steps to create the state machine redshift-elt-load-customer-sales. This state machine is invoked as soon as the AWS DMS full load task for the customer table is complete.

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose Create state machine.
  3. For Template, choose Blank.
  4. On the Actions dropdown menu, choose Import definition to import the workflow definition of the state machine.

  1. Open your preferred text editor and save the following code as an ASL file extension (for example, redshift-elt-load-customer-sales.ASL). Provide your Redshift cluster ID and the secret ARN for your Redshift cluster.
{
"Comment": "State Machine to process ETL for Customer Sales Transactions",
"StartAt": "Load_Customer_Dim",
"States": {
"Load_Customer_Dim": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "redshiftcluster-abcd",
"Database": "dev",
"Sql": "call dbo.sp_load_cust_dim()",
"SecretArn": "arn:aws:secretsmanager:us-west-2:xxx:secret:rs-cluster-secret-abcd"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"Next": "Wait on Load_Customer_Dim"
},
"Wait on Load_Customer_Dim": {
"Type": "Wait",
"Seconds": 30,
"Next": "Check_Status_Load_Customer_Dim"
},

"Check_Status_Load_Customer_Dim": {
"Type": "Task",
"Next": "Choice",
"Parameters": {
"Id.$": "$.Id"
},

"Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement"
},

"Choice": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.Status",
"StringEquals": "FINISHED"
},
"Next": "Wait on Load_Customer_Dim"
}
],
"Default": "Load_Sales_Fact"
},
"Load_Sales_Fact": {
"Type": "Task",
"End": true,
"Parameters": {
"ClusterIdentifier": "redshiftcluster-abcdef”,
"Database": "dev",
"Sql": "call dbo.sp_load_fact_sales()",
"SecretArn": "arn:aws:secretsmanager:us-west-2:xxx:secret:rs-cluster-secret-abcd"
},

"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
}
}
}
  1. Choose Choose file and upload the ASL file to create a new state machine.

  1. For State machine name, enter a name for the state machine (for example, redshift-elt-load-customer-sales).
  2. Choose Create.

After the successful creation of the state machine, you can verify the details as shown in the following screenshot.

The following diagram illustrates the state machine workflow.

The state machine includes the following steps:

  • Load_Customer_Dim – Performs the following actions:
    • Passes the stored procedure sp_load_cust_dim to the execute-statement API to run in the Redshift cluster to load the incremental data for the customer dimension
    • Sends data back the identifier of the SQL statement to the state machine
  • Wait_on_Load_Customer_Dim – Waits for at least 15 seconds
  • Check_Status_Load_Customer_Dim – Invokes the Data API’s describeStatement to get the status of the API call
  • is_run_Load_Customer_Dim_complete – Routes the next step of the ETL workflow depending on its status:
    • FINISHED – Passes the stored procedure Load_Sales_Fact to the execute-statement API to run in the Redshift cluster, which loads the incremental data for fact sales and populates the corresponding keys from the customer and date dimensions
    • All other statuses – Goes back to the wait_on_load_customer_dim step to wait for the SQL statements to finish

The state machine redshift-elt-load-customer-sales loads the dim_cust, fact_sales_stg, and fact_sales tables when invoked by the EventBridge rule.

As an optional step, you can set up event-based notifications on completion of the state machine to invoke any downstream actions, such as Amazon Simple Notification Service (Amazon SNS) or further ETL processes.

Create an EventBridge rule

EventBridge sends event notifications to the Step Functions state machine when the full load is complete. You can also turn event notifications on or off in EventBridge.

Complete the following steps to create the EventBridge rule:

  1. On the EventBridge console, in the navigation pane, choose Rules.
  2. Choose Create rule.
  3. For Name, enter a name (for example, dms-test).
  4. Optionally, enter a description for the rule.
  5. For Event bus, choose the event bus to associate with this rule. If you want this rule to match events that come from your account, select AWS default event bus. When an AWS service in your account emits an event, it always goes to your account’s default event bus.
  6. For Rule type, choose Rule with an event pattern.
  7. Choose Next.
  8. For Event source, choose AWS events or EventBridge partner events.
  9. For Method, select Use pattern form.
  10. For Event source, choose AWS services.
  11. For AWS service, choose Database Migration Service.
  12. For Event type, choose All Events.
  13. For Event pattern, enter the following JSON expression, which looks for the REPLICATON_TASK_STOPPED status for the AWS DMS task:
{
"source": ["aws.dms"],
"detail": {
"eventId": ["DMS-EVENT-0079"],
"eventType": ["REPLICATION_TASK_STOPPED"],
"detailMessage": ["Stop Reason FULL_LOAD_ONLY_FINISHED"],
"type": ["REPLICATION_TASK"],
"category": ["StateChange"]
}
}

  1. For Target type, choose AWS service.
  2. For AWS service, choose Step Functions state machine.
  3. For State machine name, enter redshift-elt-load-customer-sales.
  4. Choose Create rule.

The following screenshot shows the details of the rule created for this post.

Test the solution

Run the task and wait for the workload to complete. This workflow moves the full volume data from the source database to the Redshift cluster.

The following screenshot shows the load statistics for the customer table full load.

AWS DMS provides notifications when an AWS DMS event occurs, for example the completion of a full load or if a replication task has stopped.

After the full load is complete, AWS DMS sends events to the default event bus for your account. The following screenshot shows an example of invoking the target Step Functions state machine using the rule you created.

We configured the Step Functions state machine as a target in EventBridge. This enables EventBridge to invoke the Step Functions workflow in response to the completion of an AWS DMS full load task.

Validate the state machine orchestration

When the entire customer sales data pipeline is complete, you may go through the entire event history for the Step Functions state machine, as shown in the following screenshots.

Limitations

The Data API and Step Functions AWS SDK integration offers a robust mechanism to build highly distributed ETL applications within minimal developer overhead. Consider the following limitations when using the Data API and Step Functions:

Clean up

To avoid incurring future charges, delete the Redshift cluster, AWS DMS full load task, AWS DMS replication instance, and Step Functions state machine that you created as part of this post.

Conclusion

In this post, we demonstrated how to build an ETL orchestration for full loads from operational data stores using the Redshift Data API, EventBridge, Step Functions with AWS SDK integration, and Redshift stored procedures.

To learn more about the Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters and Using the Amazon Redshift Data API.


About the authors

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Praveen Kadipikonda is a Senior Analytics Specialist Solutions Architect at AWS based out of Dallas. He helps customers build efficient, performant, and scalable analytic solutions. He has worked with building databases and data warehouse solutions for over 15 years.

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

Revolutionizing data querying: Amazon Redshift and Visual Studio Code integration

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/revolutionizing-data-querying-amazon-redshift-and-visual-studio-code-integration/

In today’s data-driven landscape, the efficiency and accessibility of querying tools play a crucial role in driving businesses forward. Amazon Redshift recently announced integration with Visual Studio Code (), an action that transforms the way data practitioners engage with Amazon Redshift and reshapes your interactions and practices in data management. This innovation not only unlocks new possibilities, but also tackles long-standing challenges in data analytics and query handling.

While the Amazon Redshift query editor v2 (QE v2) offers a smooth experience for data analysts and business users, many organizations have data engineers and developers who rely on VS Code as their primary development tool. Traditionally, they had to use QE v2 for their development tasks, which wasn’t the most optimal solution. However, this new feature resolves that issue by enabling data engineers and developers to seamlessly integrate their development work within VS Code, enhancing their workflow efficiency.

Visual Studio Code’s integration simplifies access to database objects within Redshift data warehouses, offering an interface you’re already familiar with to run and troubleshoot your code.

By integrating Amazon Redshift Provisioned cluster, and Amazon Redshift Serverless with the popular and free VS Code, you can alleviate concerns about costs associated with third-party tools. This integration allows you to reduce or eliminate licensing expenses for query authoring and data visualization, because these functionalities are now available within the free VSCode editor.

The support for Amazon Redshift within VS Code marks a significant leap towards a more streamlined, cost-effective, and user-friendly data querying experience.

In this post, we explore how to kickstart your journey with Amazon Redshift using the AWS Toolkit for VS Code.

Solution overview

This post outlines the procedure for creating a secure and direct connection between your local VS Code environment and the Redshift cluster. Emphasizing both security and accessibility, this solution allows you to operate within the familiar VS Code interface while seamlessly engaging with your Redshift database.

The following diagram illustrates the VS Code connection to Amazon Redshift deployed in a private VPC.

To connect to a data warehouse using VS Code from the Toolkit, you can choose from the following methods:

  • Use a database user name and password
  • Use AWS Secrets Manager
  • Use temporary credentials (this option is only available with Amazon Redshift Provisioned cluster)

In the following sections, we show how to establish a connection with a database situated on an established provisioned cluster or a serverless data warehouse from the Toolkit.

Prerequisites

Before you begin using Amazon Redshift Provisioned Cluster  and Amazon Redshift Serverless with the AWS Toolkit for Visual Studio Code, make sure you’ve completed the following requirements:

  1. Connect to your AWS account using the Toolkit.
  2. Set up a Amazon Redshift or Amazon Redshift serverless data warehouse.

Establish a connection to your data warehouse using user credentials

To connect using the database user name and password, complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse (for example, US East (N. Virginia)).
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Database user name and password and provide the necessary information requested by the prompts.

After the Toolkit establishes the connection to your data warehouse, you will be able to view your available databases, tables, and schemas directly in the Toolkit explorer.

Establish a connection to your data warehouse using Secrets Manager

To connect using Secrets Manager, complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse.
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Secrets Manager and fill in the information requested at each prompt.

After the Toolkit establishes a successful connection to your data warehouse, you’ll gain visibility into your databases, tables, and schemas directly in the Toolkit explorer.

Establish a connection to your Amazon Redshift Provisioned cluster using Temporary credentials:

To connect using Temporary credentials complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse.
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Temporary Credentials and fill in the information requested at each prompt.

Run SQL statements

We have successfully established the connection. The next step involves running some SQL. The steps outlined in this section detail the process of generating and running SQL statements within your database using the Toolkit for Visual Studio Code.

  1. Navigate to the Toolkit explorer and expand Redshift, then choose the data warehouse that stores the desired database for querying.
  2. Choose Create Notebook and specify a file name and location for saving your notebook locally.
  3. Choose OK to open the notebook in your VS Code editor.
  4. Enter the following SQL statements into the VS Code editor, which will be stored in this notebook:
    create table promotion
    (
        p_promo_sk                integer               not null,
        p_promo_id                char(16)              not null,
        p_start_date_sk           integer                       ,
        p_end_date_sk             integer                       ,
        p_item_sk                 integer                       ,
        p_cost                    decimal(15,2)                 ,
        p_response_target         integer                       ,
        p_promo_name              char(50)                      ,
        p_channel_dmail           char(1)                       ,
        p_channel_email           char(1)                       ,
        p_channel_catalog         char(1)                       ,
        p_channel_tv              char(1)                       ,
        p_channel_radio           char(1)                       ,
        p_channel_press           char(1)                       ,
        p_channel_event           char(1)                       ,
        p_channel_demo            char(1)                       ,
        p_channel_details         varchar(100)                  ,
        p_purpose                 char(15)                      ,
        p_discount_active         char(1)                       ,
        primary key (p_promo_sk)
    ) diststyle all;
    
    create table reason
    (
        r_reason_sk               integer               not null,
        r_reason_id               char(16)              not null,
        r_reason_desc             char(100)                     ,
        primary key (r_reason_sk)
    ) diststyle all ;
    
    
    create table ship_mode
    (
        sm_ship_mode_sk           integer               not null,
        sm_ship_mode_id           char(16)              not null,
        sm_type                   char(30)                      ,
        sm_code                   char(10)                      ,
        sm_carrier                char(20)                      ,
        sm_contract               char(20)                      ,
        primary key (sm_ship_mode_sk)
    ) diststyle all;
    
    
    copy promotion from 's3://redshift-downloads/TPC-DS/2.13/1TB/promotion/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    copy reason from 's3://redshift-downloads/TPC-DS/2.13/1TB/reason/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    copy ship_mode from 's3://redshift-downloads/TPC-DS/2.13/1TB/ship_mode/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    
    
    select * from promotion limit 10;
    
    drop table promotion;
    drop table reason;
    drop table ship_mode;

  5. Choose Run All to run the SQL statements.

The output corresponding to your SQL statements will be visible below the entered statements within the editor.

Include markdown in a notebook

To include markdown in your notebook, complete the following steps:

  1. Access your notebook within the VS Code editor and choose Markdown to create a markdown cell.
  2. Enter your markdown content within the designated cell.
  3. Use the editing tools in the upper-right corner of the markdown cell to modify the markdown content as needed.

Congratulations, you have learned the art of using the VS Code editor to effectively interface with your Redshift environment.

Clean up

To remove the connection, complete the following steps:

  1. In the Toolkit explorer, expand Redshift, and choose the data warehouse containing your database.
  2. Choose the database (right-click) and choose Delete Connection.

Conclusion

In this post, we explored the process of using VS Code to establish a connection with Amazon Redshift, streamlining access to database objects within Redshift data warehouses.

You can learn about Amazon Redshift from Getting started with Amazon Redshift guide. Know more about write and run SQL queries directly in VS Code with the new AWS Toolkit for VS Code integration.


About the Author

Navnit Shukla, an AWS Specialist Solution Architect specializing in Analytics, is passionate about helping clients uncover valuable insights from their data. Leveraging his expertise, he develops inventive solutions that empower businesses to make informed, data-driven decisions. Notably, Navnit Shukla is the accomplished author of the book “Data Wrangling on AWS,” showcasing his expertise in the field.