Tag Archives: Case Study

How the GoDaddy data platform achieved over 60% cost reduction and 50% performance boost by adopting Amazon EMR Serverless

Post Syndicated from Brandon Abear original https://aws.amazon.com/blogs/big-data/how-the-godaddy-data-platform-achieved-over-60-cost-reduction-and-50-performance-boost-by-adopting-amazon-emr-serverless/

This is a guest post co-written with Brandon Abear, Dinesh Sharma, John Bush, and Ozcan IIikhan from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

At GoDaddy, we take pride in being a data-driven company. Our relentless pursuit of valuable insights from data fuels our business decisions and ensures customer satisfaction. Our commitment to efficiency is unwavering, and we’ve undertaken an exciting initiative to optimize our batch processing jobs. In this journey, we have identified a structured approach that we refer to as the seven layers of improvement opportunities. This methodology has become our guide in the pursuit of efficiency.

In this post, we discuss how we enhanced operational efficiency with Amazon EMR Serverless. We share our benchmarking results and methodology, and insights into the cost-effectiveness of EMR Serverless vs. fixed capacity Amazon EMR on EC2 transient clusters on our data workflows orchestrated using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). We share our strategy for the adoption of EMR Serverless in areas where it excels. Our findings reveal significant benefits, including over 60% cost reduction, 50% faster Spark workloads, a remarkable five-times improvement in development and testing speed, and a significant reduction in our carbon footprint.

Background

In late 2020, GoDaddy’s data platform initiated its AWS Cloud journey, migrating an 800-node Hadoop cluster with 2.5 PB of data from its data center to EMR on EC2. This lift-and-shift approach facilitated a direct comparison between on-premises and cloud environments, ensuring a smooth transition to AWS pipelines, minimizing data validation issues and migration delays.

By early 2022, we successfully migrated our big data workloads to EMR on EC2. Using best practices learned from the AWS FinHack program, we fine-tuned resource-intensive jobs, converted Pig and Hive jobs to Spark, and reduced our batch workload spend by 22.75% in 2022. However, scalability challenges emerged due to the multitude of jobs. This prompted GoDaddy to embark on a systematic optimization journey, establishing a foundation for more sustainable and efficient big data processing.

Seven layers of improvement opportunities

In our quest for operational efficiency, we have identified seven distinct layers of opportunities for optimization within our batch processing jobs, as shown in the following figure. These layers range from precise code-level enhancements to more comprehensive platform improvements. This multi-layered approach has become our strategic blueprint in the ongoing pursuit of better performance and higher efficiency.

Seven layers of improvement opportunities

The layers are as follows:

  • Code optimization – Focuses on refining the code logic and how it can be optimized for better performance. This involves performance enhancements through selective caching, partition and projection pruning, join optimizations, and other job-specific tuning. Using AI coding solutions is also an integral part of this process.
  • Software updates – Updating to the latest versions of open source software (OSS) to capitalize on new features and improvements. For example, Adaptive Query Execution in Spark 3 brings significant performance and cost improvements.
  • Custom Spark configurations Tuning of custom Spark configurations to maximize resource utilization, memory, and parallelism. We can achieve significant improvements by right-sizing tasks, such as through spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes, spark.executor.cores, and spark.executor.memory. However, these custom configurations might be counterproductive if they are not compatible with the specific Spark version.
  • Resource provisioning time The time it takes to launch resources like ephemeral EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2). Although some factors influencing this time are outside of an engineer’s control, identifying and addressing the factors that can be optimized can help reduce overall provisioning time.
  • Fine-grained scaling at task level Dynamically adjusting resources such as CPU, memory, disk, and network bandwidth based on each stage’s needs within a task. The aim here is to avoid fixed cluster sizes that could result in resource waste.
  • Fine-grained scaling across multiple tasks in a workflow Given that each task has unique resource requirements, maintaining a fixed resource size may result in under- or over-provisioning for certain tasks within the same workflow. Traditionally, the size of the largest task determines the cluster size for a multi-task workflow. However, dynamically adjusting resources across multiple tasks and steps within a workflow result in a more cost-effective implementation.
  • Platform-level enhancements – Enhancements at preceding layers can only optimize a given job or a workflow. Platform improvement aims to attain efficiency at the company level. We can achieve this through various means, such as updating or upgrading the core infrastructure, introducing new frameworks, allocating appropriate resources for each job profile, balancing service usage, optimizing the use of Savings Plans and Spot Instances, or implementing other comprehensive changes to boost efficiency across all tasks and workflows.

Layers 1–3: Previous cost reductions

After we migrated from on premises to AWS Cloud, we primarily focused our cost-optimization efforts on the first three layers shown in the diagram. By transitioning our most costly legacy Pig and Hive pipelines to Spark and optimizing Spark configurations for Amazon EMR, we achieved significant cost savings.

For example, a legacy Pig job took 10 hours to complete and ranked among the top 10 most expensive EMR jobs. Upon reviewing TEZ logs and cluster metrics, we discovered that the cluster was vastly over-provisioned for the data volume being processed and remained under-utilized for most of the runtime. Transitioning from Pig to Spark was more efficient. Although no automated tools were available for the conversion, manual optimizations were made, including:

  • Reduced unnecessary disk writes, saving serialization and deserialization time (Layer 1)
  • Replaced Airflow task parallelization with Spark, simplifying the Airflow DAG (Layer 1)
  • Eliminated redundant Spark transformations (Layer 1)
  • Upgraded from Spark 2 to 3, using Adaptive Query Execution (Layer 2)
  • Addressed skewed joins and optimized smaller dimension tables (Layer 3)

As a result, job cost decreased by 95%, and job completion time was reduced to 1 hour. However, this approach was labor-intensive and not scalable for numerous jobs.

Layers 4–6: Find and adopt the right compute solution

In late 2022, following our significant accomplishments in optimization at the previous levels, our attention moved towards enhancing the remaining layers.

Understanding the state of our batch processing

We use Amazon MWAA to orchestrate our data workflows in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. In this post, the terms workflow and job are used interchangeably, referring to the Directed Acyclic Graphs (DAGs) consisting of tasks orchestrated by Amazon MWAA. For each workflow, we have sequential or parallel tasks, and even a combination of both in the DAG between create_emr and terminate_emr tasks running on a transient EMR cluster with fixed compute capacity throughout the workflow run. Even after optimizing a portion of our workload, we still had numerous non-optimized workflows that were under-utilized due to over-provisioning of compute resources based on the most resource-intensive task in the workflow, as shown in the following figure.

This highlighted the impracticality of static resource allocation and led us to recognize the necessity of a dynamic resource allocation (DRA) system. Before proposing a solution, we gathered extensive data to thoroughly understand our batch processing. Analyzing the cluster step time, excluding provisioning and idle time, revealed significant insights: a right-skewed distribution with over half of the workflows completing in 20 minutes or less and only 10% taking more than 60 minutes. This distribution guided our choice of a fast-provisioning compute solution, dramatically reducing workflow runtimes. The following diagram illustrates step times (excluding provisioning and idle time) of EMR on EC2 transient clusters in one of our batch processing accounts.

Furthermore, based on the step time (excluding provisioning and idle time) distribution of the workflows, we categorized our workflows into three groups:

  • Quick run – Lasting 20 minutes or less
  • Medium run – Lasting between 20–60 minutes
  • Long run – Exceeding 60 minutes, often spanning several hours or more

Another factor we needed to consider was the extensive use of transient clusters for reasons such as security, job and cost isolation, and purpose-built clusters. Additionally, there was a significant variation in resource needs between peak hours and periods of low utilization.

Instead of fixed-size clusters, we could potentially use managed scaling on EMR on EC2 to achieve some cost benefits. However, migrating to EMR Serverless appears to be a more strategic direction for our data platform. In addition to potential cost benefits, EMR Serverless offers additional advantages such as a one-click upgrade to the newest Amazon EMR versions, a simplified operational and debugging experience, and automatic upgrades to the latest generations upon rollout. These features collectively simplify the process of operating a platform on a larger scale.

Evaluating EMR Serverless: A case study at GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

Recognizing the potential of EMR Serverless, we conducted an in-depth benchmark study using real production workflows. The study aimed to assess EMR Serverless performance and efficiency while also creating an adoption plan for large-scale implementation. The findings were highly encouraging, showing EMR Serverless can effectively handle our workloads.

Benchmarking methodology

We split our data workflows into three categories based on total step time (excluding provisioning and idle time): quick run (0–20 minutes), medium run (20–60 minutes), and long run (over 60 minutes). We analyzed the impact of the EMR deployment type (Amazon EC2 vs. EMR Serverless) on two key metrics: cost-efficiency and total runtime speedup, which served as our overall evaluation criteria. Although we did not formally measure ease of use and resiliency, these factors were considered throughout the evaluation process.

The high-level steps to assess the environment are as follows:

  1. Prepare the data and environment:
    1. Choose three to five random production jobs from each job category.
    2. Implement required adjustments to prevent interference with production.
  2. Run tests:
    1. Run scripts over several days or through multiple iterations to gather precise and consistent data points.
    2. Perform tests using EMR on EC2 and EMR Serverless.
  3. Validate data and test runs:
    1. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  4. Gather metrics and analyze results:
    1. Gather relevant metrics from the tests.
    2. Analyze results to draw insights and conclusions.

Benchmark results

Our benchmark results showed significant enhancements across all three job categories for both runtime speedup and cost-efficiency. The improvements were most pronounced for quick jobs, directly resulting from faster startup times. For instance, a 20-minute (including cluster provisioning and shut down) data workflow running on an EMR on EC2 transient cluster of fixed compute capacity finishes in 10 minutes on EMR Serverless, providing a shorter runtime with cost benefits. Overall, the shift to EMR Serverless delivered substantial performance improvements and cost reductions at scale across job brackets, as seen in the following figure.

Historically, we devoted more time to tuning our long-run workflows. Interestingly, we discovered that the existing custom Spark configurations for these jobs did not always translate well to EMR Serverless. In cases where the results were insignificant, a common approach was to discard previous Spark configurations related to executor cores. By allowing EMR Serverless to autonomously manage these Spark configurations, we often observed improved outcomes. The following graph shows the average runtime and cost improvement per job when comparing EMR Serverless to EMR on EC2.

Per Job Improvement

The following table shows a sample comparison of results for the same workflow running on different deployment options of Amazon EMR (EMR on EC2 and EMR Serverless).

Metric EMR on EC2
(Average)
EMR Serverless
(Average)
EMR on EC2 vs
EMR Serverless
Total Run Cost ($) $ 5.82 $ 2.60 55%
Total Run Time (Minutes) 53.40 39.40 26%
Provisioning Time (Minutes) 10.20 0.05 .
Provisioning Cost ($) $ 1.19 . .
Steps Time (Minutes) 38.20 39.16 -3%
Steps Cost ($) $ 4.30 . .
Idle Time (Minutes) 4.80 . .
EMR Release Label emr-6.9.0 .
Hadoop Distribution Amazon 3.3.3 .
Spark Version Spark 3.3.0 .
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3 .
Job Type Spark .

AWS Graviton2 on EMR Serverless performance evaluation

After seeing compelling results with EMR Serverless for our workloads, we decided to further analyze the performance of the AWS Graviton2 (arm64) architecture within EMR Serverless. AWS had benchmarked Spark workloads on Graviton2 EMR Serverless using the TPC-DS 3TB scale, showing a 27% overall price-performance improvement.

To better understand the integration benefits, we ran our own study using GoDaddy’s production workloads on a daily schedule and observed an impressive 23.8% price-performance enhancement across a range of jobs when using Graviton2. For more details about this study, see GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless.

Adoption strategy for EMR Serverless

We strategically implemented a phased rollout of EMR Serverless via deployment rings, enabling systematic integration. This gradual approach let us validate improvements and halt further adoption of EMR Serverless, if needed. It served both as a safety net to catch issues early and a means to refine our infrastructure. The process mitigated change impact through smooth operations while building team expertise of our Data Engineering and DevOps teams. Additionally, it fostered tight feedback loops, allowing prompt adjustments and ensuring efficient EMR Serverless integration.

We divided our workflows into three main adoption groups, as shown in the following image:

  • Canaries This group aids in detecting and resolving any potential problems early in the deployment stage.
  • Early adopters This is the second batch of workflows that adopt the new compute solution after initial issues have been identified and rectified by the canaries group.
  • Broad deployment rings The largest group of rings, this group represents the wide-scale deployment of the solution. These are deployed after successful testing and implementation in the previous two groups.

Rings

We further broke down these workflows into granular deployment rings to adopt EMR Serverless, as shown in the following table.

Ring # Name Details
Ring 0 Canary Low adoption risk jobs that are expected to yield some cost saving benefits.
Ring 1 Early Adopters Low risk Quick-run Spark jobs that expect to yield high gains.
Ring 2 Quick-run Rest of the Quick-run (step_time <= 20 min) Spark jobs
Ring 3 LargerJobs_EZ High potential gain, easy move, medium-run and long-run Spark jobs
Ring 4 LargerJobs Rest of the medium-run and long-run Spark jobs with potential gains
Ring 5 Hive Hive jobs with potentially higher cost savings
Ring 6 Redshift_EZ Easy migration Redshift jobs that suit EMR Serverless
Ring 7 Glue_EZ Easy migration Glue jobs that suit EMR Serverless

Production adoption results summary

The encouraging benchmarking and canary adoption results generated considerable interest in wider EMR Serverless adoption at GoDaddy. To date, the EMR Serverless rollout remains underway. Thus far, it has reduced costs by 62.5% and accelerated total batch workflow completion by 50.4%.

Based on preliminary benchmarks, our team expected substantial gains for quick jobs. To our surprise, actual production deployments surpassed projections, averaging 64.4% faster vs. 42% projected, and 71.8% cheaper vs. 40% predicted.

Remarkably, long-running jobs also saw significant performance improvements due to the rapid provisioning of EMR Serverless and aggressive scaling enabled by dynamic resource allocation. We observed substantial parallelization during high-resource segments, resulting in a 40.5% faster total runtime compared to traditional approaches. The following chart illustrates the average enhancements per job category.

Prod Jobs Savings

Additionally, we observed the highest degree of dispersion for speed improvements within the long-run job category, as shown in the following box-and-whisker plot.

Whisker Plot

Sample workflows adopted EMR Serverless

For a large workflow migrated to EMR Serverless, comparing 3-week averages pre- and post-migration revealed impressive cost savings—a 75.30% decrease based on retail pricing with 10% improvement in total runtime, boosting operational efficiency. The following graph illustrates the cost trend.

Although quick-run jobs realized minimal per-dollar cost reductions, they delivered the most significant percentage cost savings. With thousands of these workflows running daily, the accumulated savings are substantial. The following graph shows the cost trend for a small workload migrated from EMR on EC2 to EMR Serverless. Comparing 3-week pre- and post-migration averages revealed a remarkable 92.43% cost savings on the retail on-demand pricing, alongside an 80.6% acceleration in total runtime.

Sample workflows adopted EMR Serverless 2

Layer 7: Platform-wide improvements

We aim to revolutionize compute operations at GoDaddy, providing simplified yet powerful solutions for all users with our Intelligent Compute Platform. With AWS compute solutions like EMR Serverless and EMR on EC2, it provided optimized runs of data processing and machine learning (ML) workloads. An ML-powered job broker intelligently determines when and how to run jobs based on various parameters, while still allowing power users to customize. Additionally, an ML-powered compute resource manager pre-provisions resources based on load and historical data, providing efficient, fast provisioning at optimum cost. Intelligent compute empowers users with out-of-the-box optimization, catering to diverse personas without compromising power users.

The following diagram shows a high-level illustration of the intelligent compute architecture.

Insights and recommended best-practices

The following section discusses the insights we’ve gathered and the recommended best practices we’ve developed during our preliminary and wider adoption stages.

Infrastructure preparation

Although EMR Serverless is a deployment method within EMR, it requires some infrastructure preparedness to optimize its potential. Consider the following requirements and practical guidance on implementation:

  • Use large subnets across multiple Availability Zones – When running EMR Serverless workloads within your VPC, make sure the subnets span across multiple Availability Zones and are not constrained by IP addresses. Refer to Configuring VPC access and Best practices for subnet planning for details.
  • Modify maximum concurrent vCPU quota For extensive compute requirements, it is recommended to increase your max concurrent vCPUs per account service quota.
  • Amazon MWAA version compatibility When adopting EMR Serverless, GoDaddy’s decentralized Amazon MWAA ecosystem for data pipeline orchestration created compatibility issues from disparate AWS Providers versions. Directly upgrading Amazon MWAA was more efficient than updating numerous DAGs. We facilitated adoption by upgrading Amazon MWAA instances ourselves, documenting issues, and sharing findings and effort estimates for accurate upgrade planning.
  • GoDaddy EMR operator To streamline migrating numerous Airflow DAGs from EMR on EC2 to EMR Serverless, we developed custom operators adapting existing interfaces. This allowed seamless transitions while retaining familiar tuning options. Data engineers could easily migrate pipelines with simple find-replace imports and immediately use EMR Serverless.

Unexpected behavior mitigation

The following are unexpected behaviors we ran into and what we did to mitigate them:

  • Spark DRA aggressive scaling For some jobs (8.33% of initial benchmarks, 13.6% of production), cost increased after migrating to EMR Serverless. This was due to Spark DRA excessively assigning new workers briefly, prioritizing performance over cost. To counteract this, we set maximum executor thresholds by adjusting spark.dynamicAllocation.maxExecutor, effectively limiting EMR Serverless scaling aggression. When migrating from EMR on EC2, we suggest observing the max core count in the Spark History UI to replicate similar compute limits in EMR Serverless, such as --conf spark.executor.cores and --conf spark.dynamicAllocation.maxExecutors.
  • Managing disk space for large-scale jobs When transitioning jobs that process large data volumes with substantial shuffles and significant disk requirements to EMR Serverless, we recommend configuring spark.emr-serverless.executor.disk by referring to existing Spark job metrics. Furthermore, configurations like spark.executor.cores combined with spark.emr-serverless.executor.disk and spark.dynamicAllocation.maxExecutors allow control over the underlying worker size and total attached storage when advantageous. For example, a shuffle-heavy job with relatively low disk usage may benefit from using a larger worker to increase the likelihood of local shuffle fetches.

Conclusion

As discussed in this post, our experiences with adopting EMR Serverless on arm64 have been overwhelmingly positive. The impressive results we’ve achieved, including a 60% reduction in cost, 50% faster runs of batch Spark workloads, and an astounding five-times improvement in development and testing speed, speak volumes about the potential of this technology. Furthermore, our current results suggest that by widely adopting Graviton2 on EMR Serverless, we could potentially reduce the carbon footprint by up to 60% for our batch processing.

However, it’s crucial to understand that these results are not a one-size-fits-all scenario. The enhancements you can expect are subject to factors including, but not limited to, the specific nature of your workflows, cluster configurations, resource utilization levels, and fluctuations in computational capacity. Therefore, we strongly advocate for a data-driven, ring-based deployment strategy when considering the integration of EMR Serverless, which can help optimize its benefits to the fullest.

Special thanks to Mukul Sharma and Boris Berlin for their contributions to benchmarking. Many thanks to Travis Muhlestein (CDO), Abhijit Kundu (VP Eng), Vincent Yung (Sr. Director Eng.), and Wai Kin Lau (Sr. Director Data Eng.) for their continued support.


About the Authors

Brandon Abear is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He enjoys all things big data. In his spare time, he enjoys traveling, watching movies, and playing rhythm games.

Dinesh Sharma is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about user experience and developer productivity, always looking for ways to optimize engineering processes and saving cost. In his spare time, he loves reading and is an avid manga fan.

John Bush is a Principal Software Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about making it easier for organizations to manage data and use it to drive their businesses forward. In his spare time, he loves hiking, camping, and riding his ebike.

Ozcan Ilikhan is the Director of Engineering for the Data and ML Platform at GoDaddy. He has over two decades of multidisciplinary leadership experience, spanning startups to global enterprises. He has a passion for leveraging data and AI in creating solutions that delight customers, empower them to achieve more, and boost operational efficiency. Outside of his professional life, he enjoys reading, hiking, gardening, volunteering, and embarking on DIY projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in big data and analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Case of success: phygital environment monitoring with Zabbix

Post Syndicated from Aurea Araujo original https://blog.zabbix.com/case-of-success-phygital-environment-monitoring-with-zabbix/27108/

When retail needs involve monitoring diverse and complex environments, with digital and physical operations, the tool chosen to meet those needs must be versatile, scalable and capable of collecting and analyzing data to generate insights for managers and support decision-making.

With this in mind, Unirede – a Zabbix Premium Partner – developed a use case consisting of monitoring a client in the retail segment, using Zabbix as the main tool for data collection, consolidation and event management.

The result: a reduction of up to 70% in operational costs and other benefits related to data-based decision-making, following the data driven concept and automation at the technological environment level for rapid responses to incidents.

Continue reading to understand, in detail, how monitoring can support retail needs, based on this case of success.

Retail needs

Currently, stores and brands that offer an omnichannel experience are standing out in the market. This means that they are available 24 hours a day, 7 days a week, not only in physical spaces (such as the stores themselves), but also digitally (through e-commerce and mobile app operations). These retailers also have critical operations in distribution centers that operate without interruption.

As a result, the environment to be monitored becomes what we call phygital – both physical and digital, at the same time. This is a concept the origins of which are closely linked to the Internet and global digitalization.

With this, customers can choose to buy from home, on their cell phone, wherever they are. However, if necessary, they can find the support they need in physical stores, with the same rules and prices across all channels.

Therefore, retailers need to ensure that the operation is able to deliver, full-time, a consistent customer experience on any channel, mitigating or preventing unavailability and loss of service performance. Additionally, they need to provide support to requests for help that may arise from managers who are responsible for the company’s results.

And this is not limited to just one type of retail. Segments such as supermarkets, fast fashion, specialists, fast food and pharmaceutical, among others, can benefit from data monitoring to improve the work carried out in activities such as:

  • Understanding the purchasing journey of omnibuyer customers (on-line/off-line):
  • Complete monitoring of user experience;
  • Maximizing the operation of distribution centers;
  • Monitoring points of sale (POS);
  • Developing technical and executive dashboardswith the main KPIs of the business;
  • Reports with information for making decisions in real time.

So, through monitoring with Zabbix, it is possible to collect data from different points, organize these data as information in visual dashboards and generate knowledge to improve internal and external customer service from end to end.

How monitoring with Zabbix works

We talk about the benefits and needs regarding retailers, but we also need to explain how monitoring with Zabbix works in this type of environment.

Beginning with the basics: Zabbix collects, categorizes, analyzes and generates information from data.

This process divided into 4 stages:

  • Data collection;
  • Problem detection;
  • Visualization;
  • Reaction.
Zabbix activity cycle

In the first stage, Zabbix captures data from different sources, which can be cloud systems, containers, networks, servers, databases, APIs, stocks, payment methods, applications, services and the Internet of Things. At this stage, there is a lot of flexibility in the tool itself, and it is also possible to create completely customized collection methods.

The data are encrypted, as Zabbix follows the premise of Security by Design, and they are analyzed in a processing stage to detect possible problems or behaviors that the business wants to be detected.

At this stage, data processing categorizes information into events by severity, indicates the root cause of the potential problem or anomaly, correlates these events based on predefinitions established by system administrators or business managers, begins self-remediation of this problem, and creates predictions based on metrics behaviors so that the business is ready and prepared to deal with events that have not yet occurred.

Afterwards, the information generated is allocated to dashboards for better visualization and, consequently, administrators choose how to react to what is shown.

Reactions can take the form of alerts via message, e-mail and applications, by generating tickets to the support team, by establishing a connection to other applications and systems, and by automating problem solving – or self-healing.

Main on-line and off-line retail indicators

By monitoring systems and the main resources of the retail environment, in addition to ensuring better availability and performance, it is possible to extract critical indicators for your business in real time.

There are indicators that are found both in physical and digital retail operations. With Zabbix, it is possible to collect and measure each one of these indicators, such as:

  • Gross sales;
  • Average ticket;
  • Sales by product category;
  • Sales by payment method;
  • Number of sales;
  • Accumulated sales in a given period;
  • Inventory value;
  • Sales by M2;
  • Sales by collaborator;
  • Year-over-Year Sales (YoY);
  • Goals achieved;
  • Conversion rate (from visitor to customer);
  • Traffic origin channels;
  • Time spent in e-commerce;
  • New visitors vs. returning visitors;
  • Cart abandonment.

By analyzing the elements mentioned above, also through monitoring, it is possible to understand how the performance of on-line sales is compared to off-line sales, helping business owners to make a decision on which of the means – or all of them – should receive more or less investments to generate more revenue.

We mentioned automating manual processes not long ago.

In retail, this can happen with the discovery of events and the indication of root causes, such as identifying the unavailability of a service or component that impacts the proper operation of a given system and, based on rules defined in Zabbix, triggering a self-recovery command, without human intervention, as in the following example:

Example of self-healing with Zabbix, used by Unirede.

What are the benefits of monitoring for retailers?

How can monitoring become essential for the digital transformation of retailers?

In order to do this, we need to understand the benefits of collecting and analyzing data with Zabbix.

The first and most objective one is the monitoring of support services, both in physical and digital operations. Here, we are talking about networks, connections, and IT infrastructure in general.

But there is also monitoring distribution centers in order to optimize supply chains, and capturing data from stores, points of sale, data centers and clouds.

With this duly adjusted, we move on to how the monitoring and sustainment of basic services helps retailers to have a better view of environments, analyzing performance indicators in real time and managing SLAs.

The result of a monitoring system with Zabbix in retail is having operations focused on customer experience, ensuring cost reductions and gains in operational efficiency.

Lessons learned from retailer monitoring

With so many possibilities and advantages resulting from using Zabbix in retail, it is difficult to choose where to start.

We need to bear in mind that, when implementing Zabbix in this area, it is important to focus on what is essential, that is, monitoring only what is necessary, instead of monitoring data that will not result in any type of action or analysis in case of an event. Avoid standard templates without the necessary adjustments to meet the specificities of your environment and the analysis practices your business requires..

Automating as much as possible is also a crucial practice, as it allows the team to dedicate more time to strategic activities in the area, thus spending less time dealing with incidents and adding new hosts.

And, of course, even if it is possible to have an integration with other tools, it is worth carrying out a thorough review of existing monitoring efforts in other tools to avoid generating events that are irrelevant, that is, that do not require any type of action by the team. This approach ensures that integration is smooth and does not compromise the effectiveness of the system and operations by generating excessive or unnecessary events.

Last but not least: it is important to recognize the essential and crucial role of the people who use the tool. They not only operate Zabbix, but also play an active role in the development and continuous evolution of business monitoring efforts.

By giving these users a voice and promoting training sessions, your company can invest in more meaningful collaborations, contributing to the continuous adaptation of Zabbix to the specific needs of the retail segment.

About Unirede

Unirede is a technology company, with roots in the State of Rio Grande do Sul and headquartered in Porto Alegre. It was created in 1999 and is dedicated to provide its clients with effective consulting services to improve business performance. Its activities aim to increase productivity, minimize downtime and drive the integration of technological innovations through managed services.

With a philosophy centered on simplicity, Unirede focuses on human relationships, both internally and with clients. There is a conscious effort to not only provide services, but also to establish relationships, favoring the delivery of intelligent solutions that add value to clients.

Unirede has achieved a level of excellence and commitment to results that has resulted in the establishment of strategic partnerships with technology market leaders. It stands out as the first Zabbix Premium Partner in Latin America, since 2008, and was the first Zabbix Training Partner in the world, in 2012.

Find out more about the Official Zabbix Partner Program.

The post Case of success: phygital environment monitoring with Zabbix appeared first on Zabbix Blog.

How Zoom implemented streaming log ingestion and efficient GDPR deletes using Apache Hudi on Amazon EMR

Post Syndicated from Sekar Srinivasan original https://aws.amazon.com/blogs/big-data/how-zoom-implemented-streaming-log-ingestion-and-efficient-gdpr-deletes-using-apache-hudi-on-amazon-emr/

In today’s digital age, logging is a critical aspect of application development and management, but efficiently managing logs while complying with data protection regulations can be a significant challenge. Zoom, in collaboration with the AWS Data Lab team, developed an innovative architecture to overcome these challenges and streamline their logging and record deletion processes. In this post, we explore the architecture and the benefits it provides for Zoom and its users.

Application log challenges: Data management and compliance

Application logs are an essential component of any application; they provide valuable information about the usage and performance of the system. These logs are used for a variety of purposes, such as debugging, auditing, performance monitoring, business intelligence, system maintenance, and security. However, although these application logs are necessary for maintaining and improving the application, they also pose an interesting challenge. These application logs may contain personally identifiable data, such as user names, email addresses, IP addresses, and browsing history, which creates a data privacy concern.

Laws such as the General Data Protection Regulation (GDPR) and the California Consumer Privacy Act (CCPA) require organizations to retain application logs for a specific period of time. The exact length of time required for data storage varies depending on the specific regulation and the type of data being stored. The reason for these data retention periods is to ensure that companies aren’t keeping personal data longer than necessary, which could increase the risk of data breaches and other security incidents. This also helps ensure that companies aren’t using personal data for purposes other than those for which it was collected, which could be a violation of privacy laws. These laws also give individuals the right to request the deletion of their personal data, also known as the “right to be forgotten.” Individuals have the right to have their personal data erased, without undue delay.

So, on one hand, organizations need to collect application log data to ensure the proper functioning of their services, and keep the data for a specific period of time. But on the other hand, they may receive requests from individuals to delete their personal data from the logs. This creates a balancing act for organizations because they must comply with both data retention and data deletion requirements.

This issue becomes increasingly challenging for larger organizations that operate in multiple countries and states, because each country and state may have their own rules and regulations regarding data retention and deletion. For example, the Personal Information Protection and Electronic Documents Act (PIPEDA) in Canada and the Australian Privacy Act in Australia are similar laws to GDPR, but they may have different retention periods or different exceptions. Therefore, organizations big or small must navigate this complex landscape of data retention and deletion requirements, while also ensuring that they are in compliance with all applicable laws and regulations.

Zoom’s initial architecture

During the COVID-19 pandemic, the use of Zoom skyrocketed as more and more people were asked to work and attend classes from home. The company had to rapidly scale its services to accommodate the surge and worked with AWS to deploy capacity across most Regions globally. With a sudden increase in the large number of application endpoints, they had to rapidly evolve their log analytics architecture and worked with the AWS Data Lab team to quickly prototype and deploy an architecture for their compliance use case.

At Zoom, the data ingestion throughput and performance needs are very stringent. Data had to be ingested from several thousand application endpoints that produced over 30 million messages every minute, resulting in over 100 TB of log data per day. The existing ingestion pipeline consisted of writing the data to Apache Hadoop HDFS storage through Apache Kafka first and then running daily jobs to move the data to persistent storage. This took several hours while also slowing the ingestion and creating the potential for data loss. Scaling the architecture was also an issue because HDFS data would have to be moved around whenever nodes were added or removed. Furthermore, transactional semantics on billions of records were necessary to help meet compliance-related data delete requests, and the existing architecture of daily batch jobs was operationally inefficient.

It was at this time, through conversations with the AWS account team, that the AWS Data Lab team got involved to assist in building a solution for Zoom’s hyper-scale.

Solution overview

The AWS Data Lab offers accelerated, joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data, analytics, artificial intelligence (AI), machine learning (ML), serverless, and container modernization initiatives. The Data Lab has three offerings: the Build Lab, the Design Lab, and Resident Architect. During the Build and Design Labs, AWS Data Lab Solutions Architects and AWS experts supported Zoom specifically by providing prescriptive architectural guidance, sharing best practices, building a working prototype, and removing technical roadblocks to help meet their production needs.

Zoom and the AWS team (collectively referred to as “the team” going forward) identified two major workflows for data ingestion and deletion.

Data ingestion workflow

The following diagram illustrates the data ingestion workflow.

Data Ingestion Workflow

The team needed to quickly populate millions of Kafka messages in the dev/test environment to achieve this. To expedite the process, we (the team) opted to use Amazon Managed Streaming for Apache Kafka (Amazon MSK), which makes it simple to ingest and process streaming data in real time, and we were up and running in under a day.

To generate test data that resembled production data, the AWS Data Lab team created a custom Python script that evenly populated over 1.2 billion messages across several Kafka partitions. To match the production setup in the development account, we had to increase the cloud quota limit via a support ticket.

We used Amazon MSK and the Spark Structured Streaming capability in Amazon EMR to ingest and process the incoming Kafka messages with high throughput and low latency. Specifically, we inserted the data from the source into EMR clusters at a maximum incoming rate of 150 million Kafka messages every 5 minutes, with each Kafka message holding 7–25 log data records.

To store the data, we chose to use Apache Hudi as the table format. We opted for Hudi because it’s an open-source data management framework that provides record-level insert, update, and delete capabilities on top of an immutable storage layer like Amazon Simple Storage Service (Amazon S3). Additionally, Hudi is optimized for handling large datasets and works well with Spark Structured Streaming, which was already being used at Zoom.

After 150 million messages were buffered, we processed the messages using Spark Structured Streaming on Amazon EMR and wrote the data into Amazon S3 in Apache Hudi-compatible format every 5 minutes. We first flattened the message array, creating a single record from the nested array of messages. Then we added a unique key, known as the Hudi record key, to each message. This key allows Hudi to perform record-level insert, update, and delete operations on the data. We also extracted the field values, including the Hudi partition keys, from incoming messages.

This architecture allowed end-users to query the data stored in Amazon S3 using Amazon Athena with the AWS Glue Data Catalog or using Apache Hive and Presto.

Data deletion workflow

The following diagram illustrates the data deletion workflow.

Data Deletion Workflow

Our architecture allowed for efficient data deletions. To help comply with the customer-initiated data retention policy for GDPR deletes, scheduled jobs ran daily to identify the data to be deleted in batch mode.

We then spun up a transient EMR cluster to run the GDPR upsert job to delete the records. The data was stored in Amazon S3 in Hudi format, and Hudi’s built-in index allowed us to efficiently delete records using bloom filters and file ranges. Because only those files that contained the record keys needed to be read and rewritten, it only took about 1–2 minutes to delete 1,000 records out of the 1 billion records, which had previously taken hours to complete as entire partitions were read.

Overall, our solution enabled efficient deletion of data, which provided an additional layer of data security that was critical for Zoom, in light of its GDPR requirements.

Architecting to optimize scale, performance, and cost

In this section, we share the following strategies Zoom took to optimize scale, performance, and cost:

  • Optimizing ingestion
  • Optimizing throughput and Amazon EMR utilization
  • Decoupling ingestion and GDPR deletion using EMRFS
  • Efficient deletes with Apache Hudi
  • Optimizing for low-latency reads with Apache Hudi
  • Monitoring

Optimizing ingestion

To keep the storage in Kafka lean and optimal, as well as to get a real-time view of data, we created a Spark job to read incoming Kafka messages in batches of 150 million messages and wrote to Amazon S3 in Hudi-compatible format every 5 minutes. Even during the initial stages of the iteration, when we hadn’t started scaling and tuning yet, we were able to successfully load all Kafka messages consistently under 2.5 minutes using the Amazon EMR runtime for Apache Spark.

Optimizing throughput and Amazon EMR utilization

We launched a cost-optimized EMR cluster and switched from uniform instance groups to using EMR instance fleets. We chose instance fleets because we needed the flexibility to use Spot Instances for task nodes and wanted to diversify the risk of running out of capacity for a specific instance type in our Availability Zone.

We started experimenting with test runs by first changing the number of Kafka partitions from 400 to 1,000, and then changing the number of task nodes and instance types. Based on the results of the run, the AWS team came up with the recommendation to use Amazon EMR with three core nodes (r5.16xlarge (64 vCPUs each)) and 18 task nodes using Spot fleet instances (a combination of r5.16xlarge (64 vCPUs), r5.12xlarge (48 vCPUs), r5.8xlarge (32 vCPUs)). These recommendations helped Zoom to reduce their Amazon EMR costs by more than 80% while meeting their desired performance goals of ingesting 150 million Kafka messages under 5 minutes.

Decoupling ingestion and GDPR deletion using EMRFS

A well-known benefit of separation of storage and compute is that you can scale the two independently. But a not-so-obvious advantage is that you can decouple continuous workloads from sporadic workloads. Previously data was stored in HDFS. Resource-intensive GDPR delete jobs and data movement jobs would compete for resources with the stream ingestion, causing a backlog of more than 5 hours in upstream Kafka clusters, which was close to filling up the Kafka storage (which only had 6 hours of data retention) and potentially causing data loss. Offloading data from HDFS to Amazon S3 allowed us the freedom to launch independent transient EMR clusters on demand to perform data deletion, helping to ensure that the ongoing data ingestion from Kafka into Amazon EMR is not starved for resources. This enabled the system to ingest data every 5 minutes and complete each Spark Streaming read in 2–3 minutes. Another side effect of using EMRFS is a cost-optimized cluster, because we removed reliance on Amazon Elastic Block Store (Amazon EBS) volumes for over 300 TB storage that was used for three copies (including two replicas) of HDFS data. We now pay for only one copy of the data in Amazon S3, which provides 11 9s of durability and is relatively inexpensive storage.

Efficient deletes with Apache Hudi

What about the conflict between ingest writes and GDPR deletes when running concurrently? This is where the power of Apache Hudi stands out.

Apache Hudi provides a table format for data lakes with transactional semantics that enables the separation of ingestion workloads and updates when run concurrently. The system was able to consistently delete 1,000 records in less than a minute. There were some limitations in concurrent writes in Apache Hudi 0.7.0, but the Amazon EMR team quickly addressed this by back-porting Apache Hudi 0.8.0, which supports optimistic concurrency control, to the current (at the time of the AWS Data Lab collaboration) Amazon EMR 6.4 release. This saved time in testing and allowed for a quick transition to the new version with minimal testing. This enabled us to query the data directly using Athena quickly without having to spin up a cluster to run ad hoc queries, as well as to query the data using Presto, Trino, and Hive. The decoupling of the storage and compute layers provided the flexibility to not only query data across different EMR clusters, but also delete data using a completely independent transient cluster.

Optimizing for low-latency reads with Apache Hudi

To optimize for low-latency reads with Apache Hudi, we needed to address the issue of too many small files being created within Amazon S3 due to the continuous streaming of data into the data lake.

We utilized Apache Hudi’s features to tune file sizes for optimal querying. Specifically, we reduced the degree of parallelism in Hudi from the default value of 1,500 to a lower number. Parallelism refers to the number of threads used to write data to Hudi; by reducing it, we were able to create larger files that were more optimal for querying.

Because we needed to optimize for high-volume streaming ingestion, we chose to implement the merge on read table type (instead of copy on write) for our workload. This table type allowed us to quickly ingest the incoming data into delta files in row format (Avro) and asynchronously compact the delta files into columnar Parquet files for fast reads. To do this, we ran the Hudi compaction job in the background. Compaction is the process of merging row-based delta files to produce new versions of columnar files. Because the compaction job would use additional compute resources, we adjusted the degree of parallelism for insertion to a lower value of 1,000 to account for the additional resource usage. This adjustment allowed us to create larger files without sacrificing performance throughput.

Overall, our approach to optimizing for low-latency reads with Apache Hudi allowed us to better manage file sizes and improve the overall performance of our data lake.

Monitoring

The team monitored MSK clusters with Prometheus (an open-source monitoring tool). Additionally, we showcased how to monitor Spark streaming jobs using Amazon CloudWatch metrics. For more information, refer to Monitor Spark streaming applications on Amazon EMR.

Outcomes

The collaboration between Zoom and the AWS Data Lab demonstrated significant improvements in data ingestion, processing, storage, and deletion using an architecture with Amazon EMR and Apache Hudi. One key benefit of the architecture was a reduction in infrastructure costs, which was achieved through the use of cloud-native technologies and the efficient management of data storage. Another benefit was an improvement in data management capabilities.

We showed that the costs of EMR clusters can be reduced by about 82% while bringing the storage costs down by about 90% compared to the prior HDFS-based architecture. All of this while making the data available in the data lake within 5 minutes of ingestion from the source. We also demonstrated that data deletions from a data lake containing multiple petabytes of data can be performed much more efficiently. With our optimized approach, we were able to delete approximately 1,000 records in just 1–2 minutes, as compared to the previously required 3 hours or more.

Conclusion

In conclusion, the log analytics process, which involves collecting, processing, storing, analyzing, and deleting log data from various sources such as servers, applications, and devices, is critical to aid organizations in working to meet their service resiliency, security, performance monitoring, troubleshooting, and compliance needs, such as GDPR.

This post shared what Zoom and the AWS Data Lab team have accomplished together to solve critical data pipeline challenges, and Zoom has extended the solution further to optimize extract, transform, and load (ETL) jobs and resource efficiency. However, you can also use the architecture patterns presented here to quickly build cost-effective and scalable solutions for other use cases. Please reach out to your AWS team for more information or contact Sales.


About the Authors

Sekar Srinivasan is a Sr. Specialist Solutions Architect at AWS focused on Big Data and Analytics. Sekar has over 20 years of experience working with data. He is passionate about helping customers build scalable solutions modernizing their architecture and generating insights from their data. In his spare time he likes to work on non-profit projects focused on underprivileged Children’s education.

Chandra DhandapaniChandra Dhandapani is a Senior Solutions Architect at AWS, where he specializes in creating solutions for customers in Analytics, AI/ML, and Databases. He has a lot of experience in building and scaling applications across different industries including Healthcare and Fintech. Outside of work, he is an avid traveler and enjoys sports, reading, and entertainment.

Amit Kumar Agrawal is a Senior Solutions Architect at AWS, based out of San Francisco Bay Area. He works with large strategic ISV customers to architect cloud solutions that address their business challenges. During his free time he enjoys exploring the outdoors with his family.

Viral Shah is a Analytics Sales Specialist working with AWS for 5 years helping customers to be successful in their data journey. He has over 20+ years of experience working with enterprise customers and startups, primarily in the data and database space. He loves to travel and spend quality time with his family.

How AWS Payments migrated from Redash to Amazon Redshift Query Editor v2

Post Syndicated from Erol Murtezaoglu original https://aws.amazon.com/blogs/big-data/how-aws-payments-migrated-from-redash-to-amazon-redshift-query-editor-v2/

AWS Payments is part of the AWS Commerce Platform (CP) organization that owns the customer experience of paying AWS invoices. It helps AWS customers manage their payment methods and payment preferences, and helps customers make self-service payments to AWS.

The Machine Learning, Data and Analytics (MLDA) team at AWS Payments enables data-driven decision-making across payments processes and partners by delivering data, business insights, and causal and ML inferences through a scalable set of data, insights, and ML inference services.

In this post, we discuss how to democratize data access to Amazon Redshift using the Amazon Redshift Query Editor V2 .

Background

At AWS Payments, we had been using Redash to allow our users to author and run SQL queries against our Amazon Redshift data warehouse. Redash is a web-based SQL client application that can be used to author and run queries, visualize query results with charts, and collaborate with teams.

Over time, we began to notice incompatibilities between Redash’s operations and the needs of our workload.

We had the following requirements in mind when looking for an alternative tool:

  • Authentication and authorization
    • Provide data access without creating a database user and password
    • Allow list users using permission groups (POSIX/LDAP) for accessing the tool
    • Limit user access to database objects
  • User experience
    • Run SQL queries on the selected database
    • Save a query and rerun it later
    • Write a dynamic SQL query and run the query based on input parameters
    • Export a query result to CSV
    • Search saved queries
    • Share a query with other users as a URL

After an evaluation of alternate services, we chose the Amazon Redshift Query Editor V2.

Amazon Redshift Query Editor V2

The Amazon Redshift Query Editor V2 has the following benefits:

  • It makes data across analytics and data scientists more accessible with a unified web-based analyst workbench for data analysts to explore, share, and collaborate on data through a SQL interface
  • It provides a managed service that allows you to focus on exploring your data without managing your infrastructure
  • Users can log in to the Query Editor using single sign-on (SSO)
  • Users can connect to Amazon Redshift using federated access without providing a user name and password
  • It enables you to collaborate with team members by providing the ability to share saved queries securely
  • You can benefit from new features as soon as they get released by the Amazon Redshift Query Editor team
  • You can keep track of changes made to saved queries using the Query History feature
  • You can write parameterized SQL queries, which allows you to reuse a query with different values
  • You can turn on the Chart feature to display a graphic visualization of the current page of results
  • You can use notebooks to organize, annotate, and share multiple SQL queries in a single document
  • You can run multiple queries in parallel by running each query in a separate tab

However, it presented the following challenges:

  • To restrict user access to other AWS services within our AWS account, we attached the AWS Identity and Access Management (IAM) policies (see the appendix at the end of this post) to the SAML IAM role. The policies promote the following:
    • The user can only access the Query Editor V2 service.
    • The federated user gets assigned to a database group with limited access.
  • The Query Editor V2 currently doesn’t support cross-account Amazon Redshift connections. However, we set up Amazon Redshift data sharing to access the Amazon Redshift cluster from other AWS accounts. For more details, refer to Sharing data across clusters in Amazon Redshift.

Architecture overview

The following diagram illustrates our architecture.
The diagram illustrates our architecture
In the following sections, we will walk you through the steps to set up the query editor and migrate Redash queries.

Prerequisites

To implement this solution, you must set up federated access to the Amazon Redshift Query Editor V2 using your identity provider (IdP) services.

You can find more information in the following posts:

Set up Amazon Redshift Query Editor V2

To set up the query editor, complete the following steps:

  1. Create an Amazon Redshift database group with read-only access.
  2. Create an IAM role for accessing Query Editor V2 in an AWS account and attach the required IAM policies based on your use case. For more information, refer to Configuring your AWS account.
  3. Create a trust relationship between your IdP and AWS.
    trust relationship between your IdP and AWS
  4. Add the principal tag sqlworkbench-team to the IAM role to share queries. For more information, refer to Sharing a query.
    Add the principal tag sqlworkbench-team to the IAM role

Migrate Redash queries to Amazon Redshift Query Editor V2

In this section, we walk you through different ways to migrate your Redash queries to the Amazon Redshift Query Editor V2.

Query without parameters

Querying without parameters is pretty straightforward, just copy your query from Redash and enter it in the query editor.

  1. In Redash, navigate to the saved query and choose Edit Source.
  2. Copy the source query.
  3. In Amazon RedShift Query Editor V2, enter the query into the editor, choose the Save icon, and give your query a title.

Query with parameters

In Redash, a string between {{ }} will be treated as a parameter, but Amazon RedShift Query Editor V2 uses ${ } to identify a parameter. To migrate queries with parameters, follow the same steps but replace {{ with ${ and }} with }.

The following screenshot shows an example query in Redash.

screenshot shows an example query in RedashThe following screenshot shows the same query in Amazon RedShift Query Editor V2.

screenshot shows the same query in Query Editor V2

Multi-part query to a Query Editor V2 notebook

For a multi-part query, copy the query of each section of a Redash dashboard and add it to a notebook. The notebook in Amazon Redshift Query Editor V2 runs queries successively. You also can add a description for your query.

The following screenshot shows an example query on the Redash dashboard.
screenshot shows an example query on the Redash dashboard
The following screenshot shows the query in an Amazon Redshift Query Editor V2 notebook.
screenshot shows the query in an Amazon Redshift Query Editor V2 notebook

Summary

In this post, we demonstrated how we set up Amazon Redshift Query Editor V2 with SSO and Amazon Redshift federated access, and migrated our customers from Redash to Amazon Redshift Query Editor V2. This solution reduced our operational cost of maintaining a third-party application and its infrastructure.

If you have similar use cases and need to provide a web-based tool to your customers to explore data on your Amazon Redshift cluster, consider using Amazon Redshift Query Editor V2.

Appendix: Customer IAM policies

In this section, we provide the code for the IAM policies we attached to the SAML IAM role to restrict user access to other AWS services within our AWS account:

  • query-editor-credentials-policy – In the following code, provide your Region, account, and cluster parameters to grant access to Amazon Redshift to get cluster credentials, create users, and allow users to join groups:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "redshift:GetClusterCredentials",
            "Resource": [
                "arn:aws:redshift:<region>:<account>:cluster:<cluster>",
                "arn:aws:redshift:<region>:<account>:dbname:<cluster>/payments_beta",
                "arn:aws:redshift:<region>:<account>:dbuser:<cluster>/${redshift:DbUser}"
            ],
            "Effect": "Allow"
        },
        {
            "Action": "redshift:JoinGroup",
            "Resource": "arn:aws:redshift:<region>:<account>:dbgroup:<cluster>/payments_ro_users",
            "Effect": "Allow"
        },
        {
            "Action": "redshift:DescribeClusters",
            "Resource": "arn:aws:redshift:<region>:<account>:cluster:<cluster>",
            "Effect": "Allow"
        },
        {
            "Action": "redshift:CreateClusterUser",
            "Resource": "arn:aws:redshift:<region>:<account>:dbuser:<cluster>/${redshift:DbUser}",
            "Effect": "Allow"
        }
    ]
}
  • query-editor-access-policy – See the following code:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "redshift:DescribeClusters",
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "RedshiftPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "secretsmanager:ResourceTag/sqlworkbench-resource-owner": "${aws:userid}"
                }
            },
            "Action": [
                "secretsmanager:CreateSecret",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DeleteSecret",
                "secretsmanager:TagResource"
            ],
            "Resource": "arn:aws:secretsmanager:::sqlworkbench!",
            "Effect": "Allow",
            "Sid": "SecretsManagerPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:CalledViaLast": "sqlworkbench.amazonaws.com"
                }
            },
            "Action": "tag:GetResources",
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "ResourceGroupsTaggingPermissions"
        },
        {
            "Action": [
                "sqlworkbench:CreateFolder",
                "sqlworkbench:PutTab",
                "sqlworkbench:BatchDeleteFolder",
                "sqlworkbench:DeleteTab",
                "sqlworkbench:GenerateSession",
                "sqlworkbench:GetAccountInfo",
                "sqlworkbench:GetAccountSettings",
                "sqlworkbench:GetUserInfo",
                "sqlworkbench:GetUserWorkspaceSettings",
                "sqlworkbench:PutUserWorkspaceSettings",
                "sqlworkbench:ListConnections",
                "sqlworkbench:ListFiles",
                "sqlworkbench:ListTabs",
                "sqlworkbench:UpdateFolder",
                "sqlworkbench:ListRedshiftClusters",
                "sqlworkbench:DriverExecute",
                "sqlworkbench:ListTaggedResources"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2NonResourceLevelPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/sqlworkbench-resource-owner": "${aws:userid}"
                }
            },
            "Action": [
                "sqlworkbench:CreateConnection",
                "sqlworkbench:CreateSavedQuery",
                "sqlworkbench:CreateChart"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2CreateOwnedResourcePermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-resource-owner": "${aws:userid}"
                }
            },
            "Action": [
                "sqlworkbench:DeleteChart",
                "sqlworkbench:DeleteConnection",
                "sqlworkbench:DeleteSavedQuery",
                "sqlworkbench:GetChart",
                "sqlworkbench:GetConnection",
                "sqlworkbench:GetSavedQuery",
                "sqlworkbench:ListSavedQueryVersions",
                "sqlworkbench:UpdateChart",
                "sqlworkbench:UpdateConnection",
                "sqlworkbench:UpdateSavedQuery",
                "sqlworkbench:AssociateConnectionWithTab",
                "sqlworkbench:AssociateQueryWithTab",
                "sqlworkbench:AssociateConnectionWithChart",
                "sqlworkbench:UpdateFileFolder",
                "sqlworkbench:ListTagsForResource"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2OwnerSpecificPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-resource-owner": "${aws:userid}",
                    "aws:RequestTag/sqlworkbench-resource-owner": "${aws:userid}"
                },
                "ForAllValues:StringEquals": {
                    "aws:TagKeys": "sqlworkbench-resource-owner"
                }
            },
            "Action": "sqlworkbench:TagResource",
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2TagOnlyUserIdPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-team": "${aws:PrincipalTag/sqlworkbench-team}"
                }
            },
            "Action": [
                "sqlworkbench:GetChart",
                "sqlworkbench:GetConnection",
                "sqlworkbench:GetSavedQuery",
                "sqlworkbench:ListSavedQueryVersions",
                "sqlworkbench:ListTagsForResource",
                "sqlworkbench:AssociateQueryWithTab"
            ],
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2TeamReadAccessPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-resource-owner": "${aws:userid}",
                    "aws:RequestTag/sqlworkbench-team": "${aws:PrincipalTag/sqlworkbench-team}"
                }
            },
            "Action": "sqlworkbench:TagResource",
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2TagOnlyTeamPermissions"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-resource-owner": "${aws:userid}"
                },
                "ForAllValues:StringEquals": {
                    "aws:TagKeys": "sqlworkbench-team"
                }
            },
            "Action": "sqlworkbench:UntagResource",
            "Resource": "*",
            "Effect": "Allow",
            "Sid": "AmazonRedshiftQueryEditorV2UntagOnlyTeamPermissions"
        }
    ]
}
  • query-editor-notebook-policy – See the following code:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "sqlworkbench:ListNotebooks",
                "sqlworkbench:ListNotebookVersions",
                "sqlworkbench:ListQueryExecutionHistory"
            ],
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/sqlworkbench-resource-owner": "${aws:userid}"
                }
            },
            "Action": [
                "sqlworkbench:CreateNotebook",
                "sqlworkbench:ImportNotebook",
                "sqlworkbench:DuplicateNotebook"
            ],
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-resource-owner": "${aws:userid}"
                }
            },
            "Action": [
                "sqlworkbench:GetNotebook",
                "sqlworkbench:UpdateNotebook",
                "sqlworkbench:DeleteNotebook",
                "sqlworkbench:CreateNotebookCell",
                "sqlworkbench:DeleteNotebookCell",
                "sqlworkbench:UpdateNotebookCellContent",
                "sqlworkbench:UpdateNotebookCellLayout",
                "sqlworkbench:BatchGetNotebookCell",
                "sqlworkbench:AssociateNotebookWithTab",
                "sqlworkbench:ExportNotebook",
                "sqlworkbench:CreateNotebookVersion",
                "sqlworkbench:GetNotebookVersion",
                "sqlworkbench:CreateNotebookFromVersion",
                "sqlworkbench:DeleteNotebookVersion",
                "sqlworkbench:RestoreNotebookVersion"
            ],
            "Resource": "*",
            "Effect": "Allow"
        },
        {
            "Condition": {
                "StringEquals": {
                    "aws:ResourceTag/sqlworkbench-team": "${aws:PrincipalTag/sqlworkbench-team}"
                }
            },
            "Action": [
                "sqlworkbench:GetNotebook",
                "sqlworkbench:BatchGetNotebookCell",
                "sqlworkbench:AssociateNotebookWithTab"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}

About the Authors

Mohammad Nejad

Mohammad Nejad leads the AWS Payments Data Platform team. He has experience leading teams, architecting designs, implementing solutions, and launching products. Currently, his team focuses on building a modern data platform on AWS to provide a complete solution for processing, analyzing, and presenting data.

Erol MurtezaogluErol Murtezaoglu, a Technical Product Manager at AWS, is an inquisitive and enthusiastic thinker with a drive for self-improvement and learning. He has a strong and proven technical background in software development and architecture, balanced with a drive to deliver commercially successful products. Erol highly values the process of understanding customer needs and problems, in order to deliver solutions that exceed expectations.

Mohamed ShaabanMohamed Shaaban is a Senior Software Engineer in Amazon Redshift and is based in Berlin, Germany. He has over 12 years of experience in the software engineering. He is passionate about cloud services and building solutions that delight customers. Outside of work, he is an amateur photographer who loves to explore and capture unique moments.

Let’s Architect! Architecting for sustainability

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-architecting-for-sustainability/

Sustainability is an important topic in the tech industry, as well as society as a whole, and defined as the ability to continue to perform a process or function over an extended period of time without depletion of natural resources or the environment.

One of the key elements to designing a sustainable workload is software architecture. Think about how event-driven architecture can help reduce the load across multiple microservices, leveraging solutions like batching and queues. In these cases, the main traffic is absorbed at the entry-point of a cloud workload and ease inside your system. On top of architecture, think about data patterns, hardware optimizations, multi-environment strategies, and many more aspects of a software development lifecycle that can contribute to your sustainable posture in the Cloud.

The key takeaway: designing with sustainability in mind can help you build an application that is not only durable but also flexible enough to maintain the agility your business requires.

In this edition of Let’s Architect!, we share hands-on activities, case studies, and tips and tricks for making your Cloud applications more sustainable.

Architecting sustainably and reducing your AWS carbon footprint

Amazon Web Services (AWS) launched the Sustainability Pillar of the AWS Well-Architected Framework to help organizations evaluate and optimize their use of AWS services, and built the customer carbon footprint tool so organizations can monitor, analyze, and reduce their AWS footprint.

This session provides updates on these programs and highlights the most effective techniques for optimizing your AWS architectures. Find out how Amazon Prime Video used these tools to establish baselines and drive significant efficiencies across their AWS usage.

Take me to this re:Invent 2022 video!

Prime Video case study for understanding how the architecture can be designed for sustainability

Prime Video case study for understanding how the architecture can be designed for sustainability

Optimize your modern data architecture for sustainability

The modern data architecture is the foundation for a sustainable and scalable platform that enables business intelligence. This AWS Architecture Blog series provides tips on how to develop a modern data architecture with sustainability in mind.

Comprised of two posts, it helps you revisit and enhance your current data architecture without compromising sustainability.

Take me to Part 1! | Take me to Part 2!

An AWS data architecture; it’s now time to account for sustainability

An AWS data architecture; it’s now time to account for sustainability

AWS Well-Architected Labs: Sustainability

This workshop introduces participants to the AWS Well-Architected Framework, a set of best practices for designing and operating high-performing, highly scalable, and cost-efficient applications on AWS. The workshop also discusses how sustainability is critical to software architecture and how to use the AWS Well-Architected Framework to improve your application’s sustainability performance.

Take me to this workshop!

Sustainability implementation best practices and monitoring

Sustainability implementation best practices and monitoring

Sustainability in the cloud with Rust and AWS Graviton

In this video, you can learn about the benefits of Rust and AWS Graviton to reduce energy consumption and increase performance. Rust combines the resource efficiency of programming languages, like C, with memory safety of languages, like Java. The video also explains the benefits deriving from AWS Graviton processors designed to deliver performance- and cost-optimized cloud workloads. This resource is very helpful to understand how sustainability can become a driver for cost optimization.

Take me to this re:Invent 2022 video!

Discover how Rust and AWS Graviton can help you make your workload more sustainable and performant

Discover how Rust and AWS Graviton can help you make your workload more sustainable and performant

See you next time!

Thanks for joining us to discuss sustainability in the cloud! See you in two weeks when we’ll talk about tools for architects.

To find all the blogs from this series, you can check the Let’s Architect! list of content on the AWS Architecture Blog.

How GoDaddy built a data mesh to decentralize data ownership using AWS Lake Formation

Post Syndicated from Ankit Jhalaria original https://aws.amazon.com/blogs/big-data/how-godaddy-built-a-data-mesh-to-decentralize-data-ownership-using-aws-lake-formation/

This is a guest post co-written with Ankit Jhalaria from GoDaddy.

GoDaddy is empowering everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their idea, build a professional website, attract customers, and manage their work.

GoDaddy is a data-driven company, and getting meaningful insights from data helps them drive business decisions to delight their customers. In 2018, GoDaddy began a large infrastructure revamp and partnered with AWS to innovate faster than ever before to meet the needs of its customer growth around the world. As part of this revamp, the GoDaddy Data Platform team wanted to set the company up for long-term success by creating a well-defined data strategy and setting goals to decentralize the ownership and processing of data.

In this post, we discuss how GoDaddy uses AWS Lake Formation to simplify security management and data governance at scale, and enable data as a service (DaaS) supporting organization-wide data accessibility with cross-account data sharing using a data mesh architecture.

The challenge

In the vast ocean of data, deriving useful insights is an art. Prior to the AWS partnership, GoDaddy had a shared Hadoop cluster on premises that various teams used to create and share datasets with other analysts for collaboration. As the teams grew, copies of data started to grow in the Hadoop Distributed File System (HDFS). Several teams started to build tooling to manage this challenge independently, duplicating efforts. Managing permissions on these data assets became harder. Making data discoverable across a growing number of data catalogs and systems is something that had started to become a big challenge. Although the cost of storage these days is relatively inexpensive, when there are several copies of the same data asset available, it makes it harder for analysts to efficiently and reliably use the data available to them. Business analysts need robust pipelines on key datasets that they rely upon to make business decisions.

Solution overview

In GoDaddy’s data mesh hub and spoke model, a central data catalog contains information about all the data products that exist in the company. In AWS terminology, this is the AWS Glue Data Catalog. The data platform team provides APIs, SDKs, and Airflow Operators as components that different teams use to interact with the catalog. Activities such as updating the metastore to reflect a new partition for a given data product, and occasionally running MSCK repair operations, are all handled in the central governance account, and Lake Formation is used to secure access to the Data Catalog.

The data platform team introduced a layer of data governance that ensures best practices for building data products are followed throughout the company. We provide the tooling to support data engineers and business analysts while leaving the domain experts to run their data pipelines. With this approach, we have well-curated data products that are intuitive and easy to understand for our business analysts.

A data product refers to an entity that powers insights for analytical purposes. In simple terms, this could refer to an actual dataset pointing to a location in Amazon Simple Storage Service (Amazon S3). Data producers are responsible for the processing of data and creating new snapshots or partitions depending on the business needs. In some cases, data is refreshed every 24 hours, and other cases, every hour. Data consumers come to the data mesh to consume data, and permissions are managed in the central governance account through Lake Formation. Lake Formation uses AWS Resource Access Manager (AWS RAM) to send resource shares to different consumer accounts to be able to access the data from the central governance account. We go into details about this functionality later in the post.

The following diagram illustrates the solution architecture.

Solution architecture illustrated

Defining metadata with the central schema repository

Data is only useful if end-users can derive meaningful insights from it—otherwise, it’s just noise. As part of onboarding with the data platform, a data producer registers their schema with the data platform along with relevant metadata. This is reviewed by the data governance team that ensures best practices for creating datasets are followed. We have automated some of the most common data governance review items. This is also the place where producers define a contract about reliable data deliveries, often referred to as Service Level Objective (SLO). After a contract is in place, the data platform team’s background processes monitor and send out alerts when data producers fail to meet their contract or SLO.

When managing permissions with Lake Formation, you register the Amazon S3 location of different S3 buckets. Lake Formation uses AWS RAM to share the named resource.

When managing resources with AWS RAM, the central governance account creates AWS RAM shares. The data platform provides a custom AWS Service Catalog product to accept AWS RAM shares in consumer accounts.

Having consistent schemas with meaningful names and descriptions makes the discovery of datasets easy. Every data producer who is a domain expert is responsible for creating well-defined schemas that business users use to generate insights to make key business decisions. Data producers register their schemas along with additional metadata with the data lake repository. Metadata includes information about the team responsible for the dataset, such as their SLO contract, description, and contact information. This information gets checked into a Git repository where automation kicks in and validates the request to make sure it conforms to standards and best practices. We use AWS CloudFormation templates to provision resources. The following code is a sample of what the registration metadata looks like.

Sample code of what the registration metadata looks like

As part of the registration process, automation steps run in the background to take care of the following on behalf of the data producer:

  • Register the producer’s Amazon S3 location of the data with Lake Formation – This allows us to use Lake Formation for fine-grained access to control the table in the AWS Glue Data Catalog that refers to this location as well as to the underlying data.
  • Create the underlying AWS Glue database and table – Based on the schema specified by the data producer along with the metadata, we create the underlying AWS Glue database and table in the central governance account. As part of this, we also use table properties of AWS Glue to store additional metadata to use later for analysis.
  • Define the SLO contract – Any business-critical dataset needs to have a well-defined SLO contract. As part of dataset registration, the data producer defines a contract with a cron expression that gets used by the data platform to create an event rule in Amazon EventBridge. This rule triggers an AWS Lambda function to watch for deliveries of the data and triggers an alert to the data producer’s Slack channel if they breach the contract.

Consuming data from the data mesh catalog

When a data consumer belonging to a given line of business (LOB) identifies the data product that they’re interested in, they submit a request to the central governance team containing their AWS account ID that they use to query the data. The data platform provides a portal to discover datasets across the company. After the request is approved, automation runs to create an AWS RAM share with the consumer account covering the AWS Glue database and tables mapped to the data product registered in the AWS Glue Data Catalog of the central governance account.

The following screenshot shows an example of a resource share.

Example of a resource share

The consumer data lake admin needs to accept the AWS RAM share and create a resource link in Lake Formation to start querying the shared dataset within their account. We automated this process by building an AWS Service Catalog product that runs in the consumer’s account as a Lambda function that accepts shares on behalf of consumers.

When the resource linked datasets are available in the consumer account, the consumer data lake admin provides grants to IAM users and roles mapping to data consumers within the account. These consumers (application or user persona) can now query the datasets using AWS analytics services of their choice like Amazon Athena and Amazon EMR based on the access privileges granted by the consumer data lake admin.

Day-to-day operations and metrics

Managing permissions using Lake Formation is one part of the overall ecosystem. After permissions have been granted, data producers create new snapshots of the data at a certain cadence that can vary from every 15 minutes to a day. Data producers are integrated with the data platform APIs that informs the platform about any new refreshes of the data. The data platform automatically writes a 0-byte _SUCCESS file for every dataset that gets refreshed, and notifies the subscribed consumer account via an Amazon Simple Notification Service (Amazon SNS) topic in the central governance account. Consumers use this as a signal to trigger their data pipelines and processes to start processing newer version of the data utilizing an event-driven approach.

There are over 2,000 data products built on the GoDaddy data mesh on AWS. Every day, there are thousands of updates to the AWS Glue metastore in the central data governance account. There are hundreds of data producers generating data every hour in a wide array of S3 buckets, and thousands of data consumers consuming data across a wide array of tools, including Athena, Amazon EMR, and Tableau from different AWS accounts.

Business outcomes

With the move to AWS, GoDaddy’s Data Platform team laid the foundations to build a modern data platform that has increased our velocity of building data products and delighting our customers. The data platform has successfully transitioned from a monolithic platform to a model where ownership of data has been decentralized. We accelerated the data platform adoption to over 10 lines of business and over 300 teams globally, and are successfully managing multiple petabytes of data spread across hundreds of accounts to help our business derive insights faster.

Conclusion

GoDaddy’s hub and spoke data mesh architecture built using Lake Formation simplifies security management and data governance at scale, to deliver data as a service supporting company-wide data accessibility. Our data mesh manages multiple petabytes of data across hundreds of accounts, enabling decentralized ownership of well-defined datasets with automation in place, which helps the business discover data assets quicker and derive business insights faster.

This post illustrates the use of Lake Formation to build a data mesh architecture that enables a DaaS model for a modernized enterprise data platform. For more information, see Design a data mesh architecture using AWS Lake Formation and AWS Glue.


About the Authors

Ankit Jhalaria is the Director Of Engineering on the Data Platform at GoDaddy. He has over 10 years of experience working in big data technologies. Outside of work, Ankit loves hiking, playing board games, building IoT projects, and contributing to open-source projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in Analytics. He has over 6 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Kyle Tedeschi is a Principal Solutions Architect at AWS. He enjoys helping customers innovate, transform, and become leaders in their respective domains. Outside of work, Kyle is an avid snowboarder, car enthusiast, and traveler.

How SumUp built a low-latency feature store using Amazon EMR and Amazon Keyspaces

Post Syndicated from Shaheer Masoor original https://aws.amazon.com/blogs/big-data/how-sumup-built-a-low-latency-feature-store-using-amazon-emr-and-amazon-keyspaces/

This post was co-authored by Vadym Dolin, Data Architect at SumUp. In their own words, SumUp is a leading financial technology company, operating across 35 markets on three continents. SumUp helps small businesses be successful by enabling them to accept card payments in-store, in-app, and online, in a simple, secure, and cost-effective way. Today, SumUp card readers and other financial products are used by more than 4 million merchants around the world.

The SumUp Engineering team is committed to developing convenient, impactful, and secure financial products for merchants. To fulfill this vision, SumUp is increasingly investing in artificial intelligence and machine learning (ML). The internal ML platform in SumUp enables teams to seamlessly build, deploy, and operate ML solutions at scale.

One of the central elements of SumUp’s ML platform is the online feature store. It allows multiple ML models to retrieve feature vectors with single-digit millisecond latency, and enables application of AI for latency-critical use cases. The platform processes hundreds of transactions every second, with volume spikes during peak hours, and has steady growth that doubles the number of transactions every year. Because of this, the ML platform requires its low-latency feature store to be also highly reliable and scalable.

In this post, we show how SumUp built a millisecond-latency feature store. We also discuss the architectural considerations when setting up this solution so it can scale to serve multiple use cases, and present results showcasing the setups performance.

Overview of solution

To train ML models, we need historical data. During this phase, data scientists experiment with different features to test which ones produce the best model. From a platform perspective, we need to support bulk read and write operations. Read latency isn’t critical at this stage because the data is read into training jobs. After the models are trained and moved to production for real-time inference, we have the following requirements for the platform change: we need to support low-latency reads and use only the latest features data.

To fulfill these needs, SumUp built a feature store consisting of offline and online data stores. These were optimized for the requirements as described in the following table.

Data Store History Requirements ML Workflow Requirements Latency Requirements Storage Requirements Throughput Requirements Storage Medium
Offline Entire History Training Not important Cost-effective for large volumes Bulk read and writes Amazon S3
Online Only the latest Features Inference Single-digit millisecond Not important Read optimized Amazon Keyspaces

Amazon Keyspaces (for Apache Cassandra) is a serverless, scalable, and managed Apache Cassandra–compatible database service. It is built for consistent, single-digit-millisecond response times at scale. SumUp uses Amazon Keyspaces as a key-value pair store, and these features make it suitable for their online feature store. Delta Lake is an open-source storage layer that supports ACID transactions and is fully compatible with Apache Spark, making it highly performant at bulk read and write operations. You can store Delta Lake tables on Amazon Simple Storage Service (Amazon S3), which makes it a good fit for the offline feature store. Data scientists can use this stack to train models against the offline feature store (Delta Lake). When the trained models are moved to production, we switch to using the online feature store (Amazon Keyspaces), which offers the latest features set, scalable reads, and much lower latency.

Another important consideration is that we write a single feature job to populate both feature stores. Otherwise, SumUp would have to maintain two sets of code or pipelines for each feature creation job. We use Amazon EMR and create the features using PySpark DataFrames. The same DataFrame is written to both Delta Lake and Amazon Keyspaces, which eliminates the hurdle of having separate pipelines.

Finally, SumUp wanted to utilize managed services. It was important to SumUp that data scientists and data engineers focus their efforts on building and deploying ML models. SumUp had experimented with managing their own Cassandra cluster, and found it difficult to scale because it required specialized expertise. Amazon Keyspaces offered scalability without management and maintenance overhead. For running Spark workloads, we decided to use Amazon EMR. Amazon EMR makes it easy to provision new clusters and automatically or manually add and remove capacity as needed. You can also define a custom policy for auto scaling the cluster to suit your needs. Amazon EMR version 6.0.0 and above supports Spark version 3.0.0, which is compatible with Delta Lake.

It took SumUp 3 months from testing out AWS services to building a production-grade feature store capable of serving ML models. In this post we share a simplified version of the stack, consisting of the following components:

  • S3 bucket A – Stores the raw data
  • EMR cluster – For running PySpark jobs for populating the feature store
  • Amazon Keyspaces feature_store – Stores the online features table
  • S3 Bucket B – Stores the Delta Lake table for offline features
  • IAM role feature_creator – For running the feature job with the appropriate permissions
  • Notebook instance – For running the feature engineering code

We use a simplified version of the setup to make it easy to follow the code examples. SumUp data scientists use Jupyter notebooks for exploratory analysis of the data. Feature engineering jobs are deployed using an AWS Step Functions state machine, which consists of an AWS Lambda function that submits a PySpark job to the EMR cluster.

The following diagram illustrates our simplified architecture.

Prerequisites

To follow the solution, you need certain access rights and AWS Identity and Access Management (IAM) privileges:

  • An IAM user with AWS Command Line Interface (AWS CLI) access to an AWS account
  • IAM privileges to do the following:
    • Generate Amazon Keyspaces credentials
    • Create a keyspace and table
    • Create an S3 bucket
    • Create an EMR cluster
    • IAM Get Role

Set up the dataset

We start by cloning the project git repository, which contains the dataset we need to place in bucket A. We use a synthetic dataset, under Data/daily_dataset.csv. This dataset consists of energy meter readings for households. The file contains information like the number of measures, minimum, maximum, mean, median, sum, and std for each household on a daily basis. To create an S3 bucket (if you don’t already have one) and upload the data file, follow these steps:

  1. Clone the project repository locally by running the shell command:
    git clone https://github.com/aws-samples/amazon-keyspaces-emr-featurestore-kit.git

  2. On the Amazon S3 console, choose Create bucket.
  3. Give the bucket a name. For this post, we use featurestore-blogpost-bucket-xxxxxxxxxx (it’s helpful to append the account number to the bucket name to ensure the name is unique for common prefixes).
  4. Choose the Region you’re working in.
    It’s important that you create all resources in the same Region for this post.
  5. Public access is blocked by default, and we recommend that you keep it that way.
  6. Disable bucket versioning and encryption (we don’t need it for this post).
  7. Choose Create bucket.
  8. After the bucket is created, choose the bucket name and drag the folders Dataset and EMR into the bucket.

Set up Amazon Keyspaces

We need to generate credentials for Amazon Keyspaces, which we use to connect with the service. The steps for generating the credentials are as follows:

  1. On the IAM console, choose Users in the navigation pane.
  2. Choose an IAM user you want to generate credentials for.
  3. On the Security credentials tab, under Credentials for Amazon Keyspaces (for Apache Cassandra), choose Generate Credentials.
    A pop-up appears with the credentials, and an option to download the credentials. We recommend downloading a copy because you won’t be able to view the credentials again.We also need to create a table in Amazon Keyspaces to store our feature data. We have shared the schema for the keyspace and table in the GitHub project files Keyspaces/keyspace.cql and Keyspaces/Table_Schema.cql.
  4. On the Amazon Keyspaces console, choose CQL editor in the navigation pane.
  5. Enter the contents of the file Keyspaces/Keyspace.cql in the editor and choose Run command.
  6. Clear the contents of the editor, enter the contents of Keyspaces/Table_Schema.cql, and choose Run command.

Table creation is an asynchronous process, and you’re notified if the table is successfully created. You can also view it by choosing Tables in the navigation pane.

Set up an EMR cluster

Next, we set up an EMR cluster so we can run PySpark code to generate features. First, we need to set up a trust store password. A truststore file contains the Application Server’s trusted certificates, including public keys for other entities, this file is generated by the provided script and we need to provide a password for protecting this file. Amazon Keyspaces provides encryption in transit and at rest to protect and secure data transmission and storage, and uses Transport Layer Security (TLS) to help secure connections with clients. To connect to Amazon Keyspaces using TLS, we need to download an Amazon digital certificate and configure the Python driver to use TLS. This certificate is stored in a trust store; when we retrieve it, we need to provide the correct password.

  1. In the file EMR/emr_bootstrap_script.sh, update the following line to a password you want to use:
    # Create a JKS keystore from the certificate
    PASS={your_truststore_password_here}

  2. To point the bootstrap script to the one we uploaded to Amazon S3, update the following line to reflect the S3 bucket we created earlier:
    # Copy the Cassandra Connector config
    aws s3 cp s3://{your-s3-bucket}/EMR/app.config /home/hadoop/app.config

  3. To update the app.config file to reflect the correct trust store password, in the file EMR/app.config, update the value for truststore-password to the value you set earlier:
    {
        ssl-engine-factory {
          class = DefaultSslEngineFactory
          truststore-path = "/home/hadoop/.certs/cassandra_keystore.jks"
          truststore-password = "{your_password_here}"
        }
    }

  4. In the file EMR/app.config, update the following lines to reflect the Region and the user name and password generated earlier:
    contact-points = ["cassandra.<your-region>.amazonaws.com:9142"]
    load-balancing-policy.local-datacenter = <your-region>
    ..
    auth-provider {
        class = PlainTextAuthProvider
        username = "{your-keyspace-username}"
        password = "{your-keyspace-password}"
    }

    We need to create default instance roles, which are needed to run the EMR cluster.

  5. Update the contents S3 bucket created in the pre-requisite section by dragging the EMR folder into the bucket again.
  6. To create the default roles, run the create-default-roles command:
    aws emr create-default-roles

    Next, we create an EMR cluster. The following code snippet is an AWS CLI command that has Hadoop, Spark 3.0, Livy and JupyterHub installed. This also runs the bootstrapping script on the cluster to set up the connection to Amazon Keyspaces.

  7. Create the cluster with the following code. Provide the subnet ID to start a Jupyter notebook instance associated with this cluster, the S3 bucket you created earlier, and the Region you’re working in. You can provide the default Subnet, and to find this navigate to VPC>Subnets and copy the default subnet id.
    aws emr create-cluster --termination-protected --applications Name=Hadoop Name=Spark Name=Livy Name=Hive Name=JupyterHub --tags 'creator=feature-store-blogpost' --ec2-attributes '{"InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"your-subnet-id"}' --service-role EMR_DefaultRole --release-label emr-6.1.0 --log-uri 's3n://{your-s3-bucket}/elasticmapreduce/' --name 'emr_feature_store' --instance-groups '[{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"Core - 2"},{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":2}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"}]' --bootstrap-actions '[{"Path":"s3://{your-s3-bucket HERE}/EMR/emr_bootstrap_script.sh","Name":"Execute_bootstarp_script"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region your-region

    Lastly, we create an EMR notebook instance to run the PySpark notebook Feature Creation and loading-notebook.ipynb (included in the repo).

  8. On the Amazon EMR console, choose Notebooks in the navigation pane.
  9. Choose Create notebook.
  10. Give the notebook a name and choose the cluster emr_feature_store.
  11. Optionally, configure the additional settings.
  12. Choose Create notebook.It can take a few minutes before the notebook instance is up and running.
  13. When the notebook is ready, select the notebook and choose either Open JupyterLab or Open Jupyter.
  14. In the notebook instance import, open the notebook Feature Creation and loading-notebook.ipynb (included in the repo) and change the kernel to PySpark.
  15. Follow the instructions in the notebook and run the cells one by one to read the data from Amazon S3, create features, and write these to Delta Lake and Amazon Keyspaces.

Performance testing

To test throughput for our online feature store, we run a simulation on the features we created. We simulate approximately 40,000 requests per second. Each request queries data for a specific key (an ID in our feature table). The process tasks do the following:

  • Initialize a connection to Amazon Keyspaces
  • Generate a random ID to query the data
  • Generate a CQL statement:
    SELECT * FROM feature_store.energy_data_features WHERE id=[list_of_ids[random_index between 0-5559]];

  • Start a timer
  • Send the request to Amazon Keyspaces
  • Stop the timer when the response from Amazon Keyspaces is received

To run the simulation, we start 245 parallel AWS Fargate tasks running on Amazon Elastic Container Service (Amazon ECS). Each task runs a Python script that makes 1 million requests to Amazon Keyspaces. Because our dataset only contains 5,560 unique IDs, we generate 1 million random numbers between 0–5560 at the start of the simulation and query the ID for each request. To run the simulation, we included the code in the folder Simulation. You can run the simulation in a SageMaker notebook instance by completing the following steps:

  1. On the Amazon SageMaker console, create a SageMaker notebook instance (or use an existing one).You can choose an ml.t3.large instance.
  2. Let SageMaker create an execution role for you if you don’t have one.
  3. Open the SageMaker notebook and choose Upload.
  4. Upload the Simulation folder from the repository. Alternatively, open a terminal window on the notebook instance and clone the repository https://github.com/aws-samples/amazon-keyspaces-emr-featurestore-kit.git.
  5. Follow the instructions and run the steps and cells in the Simulation/ECS_Simulation.ipynb notebook.
  6. On the Amazon ECS console, choose the cluster you provisioned with the notebook and choose the Tasks tab to monitor the tasks.

Each task writes the latency figures to a file and moves this to an S3 location. When the simulation ends, we collect all the data to get aggregated stats and plot charts.

In our setup, we set the capacity mode for Amazon Keyspaces to Provisioned RCU (read capacity units) at 40000 (fixed). After we start the simulation, the RCU rise close to 40000. After we start the simulation, the RCU (read capacity units) rise close to 40000, and the simulation takes around an hour to finish, as illustrated in the following visualization.

The first analysis we present is the latency distribution for the 245 million requests made during the simulation. Here the 99% percentile falls inside single-digit millisecond latency, as we would expect.

Quantile Latency (ms)
50% 3.11
90% 3.61
99% 5.56
99.90% 25.63

For the second analysis, we present the following time series charts for latency. The chart at the bottom shows the raw latency figures from all the 245 workers. The chart above that plots the average and minimum latency across all workers grouped over 1-second intervals. Here we can see both the minimum and the average latency throughout the simulation stays below 10 milliseconds. The third chart from the bottom plots maximum latency across all workers grouped over 1-second intervals. This chart shows occasional spikes in latency but nothing consistent we need to worry about. The top two charts are latency distributions; the one on the left plots all the data, and the one on the right plots the 99.9% percentile. Due to the presence of some outliers, the chart on the left shows a peak close to zero and a very tailed distribution. After we remove these outliers, we can see in the chart on the right that 99.9% of requests are completed in less than 5.5 milliseconds. This is a great result, considering we sent 245 million requests.

Cleanup

Some of the resources we created in this blogpost would incur costs if left running. Remember to terminate the EMR cluster, empty the S3 bucket and delete it, delete the Amazon KeySpaces table. Also delete the SageMaker and Amazon EMR notebooks. The Amazon ECS cluster is billed on tasks and would not incur any additional costs.

Conclusion

Amazon EMR, Amazon S3, and Amazon Keyspaces provide a flexible and scalable development experience for feature engineering. EMR clusters are easy to manage, and teams can share environments without compromising compute and storage capabilities. EMR bootstrapping makes it easy to install and test out new tools and quickly spin up environments to test out new ideas. Having the feature store split into offline and online store simplifies model training and deployment, and provides performance benefits.

In our testing, Amazon Keyspaces was able to handle peak throughput read requests within our desired requirement of single digit latency. It’s also worth mentioning that we found the on-demand mode to adapt to the usage pattern and an improvement in read/write latency a couple of days from when it was switched on.

Another important consideration to make for latency-sensitive queries is row length. In our testing, tables with lower row length had lower read latency. Therefore, it’s more efficient to split the data into multiple tables and make asynchronous calls to retrieve it from multiple tables.

We encourage you to explore adding security features and adopting security best practices according to your needs and potential company standards.

If you found this post useful, check out Loading data into Amazon Keyspaces with cqlsh for tips on how to tune Amazon Keyspaces, and Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy on how to build and deploy PySpark jobs.


About the authors

Shaheer Mansoor is a Data Scientist at AWS. His focus is on building machine learning platforms that can host AI solutions at scale. His interest areas are ML Ops, Feature Stores, Model Hosting and Model Monitoring.

Vadym Dolinin is a Machine Learning Architect in SumUp. He works with several teams on crafting the ML platform, which enables data scientists to build, deploy, and operate machine learning solutions in SumUp. Vadym has 13 years of experience in the domains of data engineering, analytics, BI, and ML.

Oliver Zollikofer is a Data Scientist at AWS. He enables global enterprise customers to build and deploy machine learning models, as well as architect related cloud solutions.

How William Hill migrated NoSQL workloads at scale to Amazon Keyspaces

Post Syndicated from Kunal Gautam original https://aws.amazon.com/blogs/big-data/how-william-hill-migrated-nosql-workloads-at-scale-to-amazon-keyspaces/

Social gaming and online sports betting are competitive environments. The game must be able to handle large volumes of unpredictable traffic while simultaneously promising zero downtime. In this domain, user retention is no longer just desirable, it’s critical. William Hill is a global online gambling company based in London, England, and it is the founding member of the UK Betting and Gaming Council. They share the mission to champion the betting and gaming industry and set world-class standards to make sure of an enjoyable, fair, and safe betting and gambling experience for all of their customers. In sports betting, William Hill is an industry-leading brand, awarded with prestigious industry titles like the IGA Awards Sports Betting Operator of the year in 2019, 2020, and 2022, and the SBC Awards Racing Sportsbook of the Year in 2019. William Hill has been acquired by Caesars Entertainment, Inc (NASDAQ: CZR) in April 2021, and it’s the largest casino-entertainment company in the US and one of the world’s most diversified casino-entertainment providers. At the heart of William Hill gaming platform is a NoSQL database that maintains 100% uptime, scales in real-time to handle millions of users or more, and provides users with a responsive and personalized experience across all of their devices.

In this post, we’ll discuss how William Hill moved their workload from Apache Cassandra to Amazon Keyspaces (for Apache Cassandra) with zero downtime using AWS Glue ETL.

William Hill was facing challenges regarding scalability, cluster instability, high operational costs, and manual patching and server maintenance. They were looking for a NoSQL solution which was scalable, highly-available, and completely managed. This let them focus on providing better user experience rather than maintaining infrastructure. William Hill Limited decided to move forward with Amazon Keyspaces, since it can run Apache Cassandra workloads on AWS using the same Cassandra application code and developer tools used today, without the need to provision, patch, manage servers, install, maintain, or operate software.

Solution overview

William Hill Limited wanted to migrate their existing Apache Cassandra workloads to Amazon Keyspaces with a replication lag of minutes, with minimum migration costs and development efforts. Therefore, AWS Glue ETL was leveraged to deliver the desired outcome.

AWS Glue is a serverless data integration service that provides multiple benefits for migration:

  • No infrastructure to maintain; allocates the necessary computing power and runs multiple migration jobs simultaneously.
  • All-in-one pricing model that includes infrastructure and is 55% cheaper than other cloud data integration options.
  • No lock in with the service; possible to develop data migration pipelines in open-source Apache Spark (Spark SQL, PySpark, and Scala).
  • Migration pipeline can be scaled fearlessly with Amazon Keyspaces and AWS Glue.
  • Built-in pipeline monitoring to make sure of in-migration continuity.
  • AWS Glue ETL jobs make it possible to perform bulk data extraction from Apache Cassandra and ingest to Amazon Keyspaces.

In this post, we’ll take you through William Hill’s journey of building the migration pipeline from scratch to migrate the Apache Cassandra workload to Amazon Keyspaces by leveraging AWS Glue ETL with DataStax Spark Cassandra connector.

For the purpose of this post, let’s look at a typical Cassandra Network setup on AWS and the mechanism used to establish the connection with AWS Glue ETL. The migration solution described also works for Apache Cassandra hosted on on-premises clusters.

Architecture overview

The architecture demonstrates the migration environment that requires Amazon Keyspaces, AWS Glue, Amazon Simple Storage Service (Amazon S3), and the Apache Cassandra cluster. To avoid a high CPU utilization/saturation on the Apache Cassandra cluster during the migration process, you might want to deploy another Cassandra datacenter to isolate your production from the migration workload to make the migration process seamless for your customers.

Amazon S3 has been used for staging while migrating data from Apache Cassandra to Amazon Keyspaces to make sure that the IO load on Cassandra serving live traffic on production is minimized, in case the data upload to Amazon Keyspaces fails and a retry must be done.

Prerequisites

The Apache Cassandra cluster is hosted on Amazon Elastic Compute Cloud (Amazon EC2) instances, spread across three availability zones, and hosted in private subnets. AWS Glue ETL is hosted on Amazon Virtual Private Cloud (Amazon VPC) and thus needs a AWS Glue Studio custom Connectors and Connections to be setup to communicate with the Apache Cassandra nodes hosted on the private subnets in the customer VPC. Thereby, this enables the connection to the Cassandra cluster hosted in the VPC. The DataStax Spark Cassandra Connector must be downloaded and saved onto an Amazon S3 bucket: s3://$MIGRATION_BUCKET/jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar.

Let’s create an AWS Glue Studio custom connector named cassandra_connection and its corresponding connection named conn-cassandra-custom for AWS region us-east-1.

For the connector created, create an AWS Glue Studio connection and populate it with network information VPC, and a Subnet allowing for AWS Glue ETL to establish a connection with Apache Casandra.

  • Name: conn-cassandra-custom
  • Network Options

Let’s begin by creating a keyspace and table in Amazon Keyspaces using Amazon Keyspaces Console or CQLSH, and then create a target keyspace named target_keyspace and a target table named target_table.

CREATE KEYSPACE target_keyspace WITH replication = {'class': 'SingleRegionStrategy'};

CREATE TABLE target_keyspace.target_table (
    userid      uuid,
    level       text,
    gameid      int,
    description text,
    nickname    text,
    zip         text,
    email       text,
    updatetime  text,
    PRIMARY KEY (userid, level, gameid)
) WITH default_time_to_live = 0 AND CUSTOM_PROPERTIES = {
	'capacity_mode':{
		'throughput_mode':'PROVISIONED',
		'write_capacity_units':76388,
		'read_capacity_units':3612
	}
} AND CLUSTERING ORDER BY (level ASC, gameid ASC);

After the table has been created, switch the table to on-demand mode to pre-warm the table and avoid AWS Glue ETL job throttling failures. The following script will update the throughput mode.

ALTER TABLE target_keyspace.target_table 
WITH CUSTOM_PROPERTIES = {
	'capacity_mode':{
		'throughput_mode':'PAY_PER_REQUEST'
	}
} 

Let’s go ahead and create two Amazon S3 buckets to support the migration process. The first bucket (s3://your-spark-cassandra-connector-bucket-name)should store the spark Cassandra connector assembly jar file, Cassandra, and Keyspaces configuration YAML files.

The second bucket (s3://your-migration-stage-bucket-name) will be used to store intermediate parquet files to identify the delta between the Cassandra cluster and the Amazon Keyspaces table to track changes between subsequent executions of the AWS Glue ETL jobs.

In the following KeyspacesConnector.conf, set your contact points to connect to Amazon Keyspaces, and replace the username and the password to the AWS credentials.

Using the RateLimitingRequestThrottler we can make sure that requests don’t exceed the configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 requests per second. With this configuration, and G.1X DPU, you’ll achieve 1000 request per AWS Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.

datastax-java-driver {
  basic.request.consistency = "LOCAL_QUORUM"
  basic.contact-points = ["cassandra.us-east-1.amazonaws.com:9142"]
   advanced.reconnect-on-init = true
   basic.load-balancing-policy {
        local-datacenter = "us-east-1"
    }
    advanced.auth-provider = {
       class = PlainTextAuthProvider
       username = "user-at-sample"
       password = "S@MPLE=PASSWORD="
    }
    advanced.throttler = {
       class = RateLimitingRequestThrottler
       max-requests-per-second = 1000
       max-queue-size = 50000
       drain-interval = 1 millisecond
    }
    advanced.ssl-engine-factory {
      class = DefaultSslEngineFactory
      hostname-validation = false
    }
    advanced.connection.pool.local.size = 1
}

Similarly, create a CassandraConnector.conf file, set the contact points to connect to the Cassandra cluster, and replace the username and the password respectively.

datastax-java-driver {
  basic.request.consistency = "LOCAL_QUORUM"
  basic.contact-points = ["127.0.0.1:9042"]
   advanced.reconnect-on-init = true
   basic.load-balancing-policy {
        local-datacenter = "datacenter1"
    }
    advanced.auth-provider = {
       class = PlainTextAuthProvider
       username = "user-at-sample"
       password = "S@MPLE=PASSWORD="
    }
}

Build AWS Glue ETL migration pipeline with Amazon Keyspaces

To build reliable, consistent delta upload Glue ETL pipeline, let’s decouple the migration process into two AWS Glue ETLs.

  • CassandraToS3 Glue ETL: Read data from the Apache Cassandra cluster and transfer the migration workload to Amazon S3 in the Apache Parquet format. To identify incremental changes in the Cassandra tables, the job stores separate parquet files with primary keys with an updated timestamp.
  • S3toKeyspaces Glue ETL: Uploads the migration workload from Amazon S3 to Amazon Keyspaces. During the first run, the ETL uploads the complete data set from Amazon S3 to Amazon Keyspaces, and for the subsequent run calculates the incremental changes by comparing the updated timestamp across two subsequent runs and calculating the incremental difference. The job also takes care of inserting new records, updating existing records, and deleting records based on the incremental difference.

In this example, we’ll use Scala to write the AWS Glue ETL, but you can also use PySpark.

Let’s go ahead and create an AWS Glue ETL job named CassandraToS3 with the following job parameters:

aws glue create-job \
    --name "CassandraToS3" \
    --role "GlueKeyspacesMigration" \
    --description "Offload data from the Cassandra to S3" \
    --glue-version "3.0" \
    --number-of-workers 2 \
    --worker-type "G.1X" \
    --connections "conn-cassandra-custom" \
    --command "Name=glueetl,ScriptLocation=s3://$MIGRATION_BUCKET/scripts/CassandraToS3.scala" \
    --max-retries 0 \
    --default-arguments '{
        "--job-language":"scala",
        "--KEYSPACE_NAME":"source_keyspace",
        "--TABLE_NAME":"source_table",
        "--S3_URI_FULL_CHANGE":"s3://$MIGRATION_BUCKET/full-dataset/",
        "--S3_URI_CURRENT_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/current/",
        "--S3_URI_NEW_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/new/",
        "--extra-files":"s3://$MIGRATION_BUCKET/conf/CassandraConnector.conf",
        "--conf":"spark.cassandra.connection.config.profile.path=CassandraConnector.conf",
        "--class":"GlueApp"
    }'

The CassandraToS3 Glue ETL job reads data from the Apache Cassandra table source_keyspace.source_table and writes it to the S3 bucket in the Apache Parquet format. The job rotates the parquet files to help identify delta changes in the data between consecutive job executions. To identify inserts, updates, and deletes, you must know primary key and columns write times (updated timestamp) in the Cassandra cluster up front. Our primary key consists of several columns userid, level, gameid, and a write time column updatetime. If you have multiple updated columns, then you must use more than one write time columns with an aggregation function. For example, for email and updatetime, take the maximum value between write times for email and updatetime.

The following AWS Glue spark code offloads data to Amazon S3 using the spark-cassandra-connector. The script takes four parameters KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.

To upload the data from Amazon S3 to Amazon Keyspaces, you must create a S3toKeyspaces Glue ETL job using the Glue spark code to read the parquet files from the Amazon S3 bucket created as an output of CassandraToS3 Glue job and identify inserts, updates, deletes, and execute requests against the target table in Amazon Keyspaces. The code sample provided takes four parameters: KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.

Let’s go ahead and create our second AWS Glue ETL job S3toKeyspaces with the following job parameters:

aws glue create-job \
    --name "S3toKeyspaces" \
    --role "GlueKeyspacesMigration" \
    --description "Push data to Amazon Keyspaces" \
    --glue-version "3.0" \
    --number-of-workers 2 \
    --worker-type "G.1X" \
    --command "Name=glueetl,ScriptLocation=s3://amazon-keyspaces-backups/scripts/S3toKeyspaces.scala" \
    --default-arguments '{
        "--job-language":"scala",
        "--KEYSPACE_NAME":"target_keyspace",
        "--TABLE_NAME":"target_table",
        "--S3_URI_FULL_CHANGE":"s3://$MIGRATION_BUCKET/full-dataset/",
        "--S3_URI_CURRENT_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/current/",
        "--S3_URI_NEW_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/new/",
        "--extra-files":"s3://$MIGRATION_BUCKET/conf/KeyspacesConnector.conf",
        "--conf":"spark.cassandra.connection.config.profile.path=KeyspacesConnector.conf",
        "--class":"GlueApp"
    }'

Job scheduling

The final step is to configure AWS Glue Triggers or Amazon EventBridge depending on your scheduling needs to trigger S3toKeyspaces Glue ETL when the job CassandraToS3 has succeeded. If you want to run the CassandraToS3 based on the schedule and configure the schedule option, then the following example showcases how to schedule cassandraToS3 to run every 15 minutes.

Job tuning

There are Spark settings recommended to begin with Amazon Keyspaces, which can then be increased later as appropriate for your workload.

  • Use a Spark partition size (groups multiple Cassandra rows) smaller than 8 MBs to avoid replaying large Spark tasks during a task failure.
  • Use a low concurrent number of writes per DPU with a large number of retries. Add the following options to the job parameters: --conf spark.cassandra.query.retry.count=500 --conf spark.cassandra.output.concurrent.writes=3.
  • Set spark.task.maxFailures to a bounded value. For example, you can start from 32 and increase as needed. This option can help you increase a number of tasks reties during a table pre-warm stage. Add the following option to the job parameters: --conf spark.task.maxFailures=32
  • Another recommendation is to turn off batching to improve random access patterns. Add the following options to the job parameters:
    spark.cassandra.output.batch.size.rows=1
    spark.cassandra.output.batch.grouping.key=none spark.cassandra.output.batch.grouping.buffer.size=100
  • Randomize your workload. Amazon Keyspaces partitions data using partition keys. Although Amazon Keyspaces has built-in logic to help load balance requests for the same partition key, loading the data is faster and more efficient if you randomize the order because you can take advantage of the built-in load balancing of writing to different partitions. To spread the writes across the partitions evenly, you must randomize the data in the dataframe. You might use a rand function to shuffle rows in the dataframe.

Summary

William Hill was able to migrate their workload from Apache Cassandra to Amazon Keyspaces at scale using AWS Glue, without the needs to make any changes on their application tech stack. The adoption of Amazon Keyspaces has provided them with the headroom to focus on their Application and customer experience, as with Amazon Keyspaces there’s no need to manage servers, get performance at scale, highly-scalable, and secure solution with the ability to handle the sudden spike in demand.

In this post, you saw how to use AWS Glue to migrate the Cassandra workload to Amazon Keyspaces, and simultaneously keep your Cassandra source databases completely functional during the migration process. When your applications are ready, you can choose to cut over your applications to Amazon Keyspaces with minimal replication lag in sub minutes between the Cassandra cluster and Amazon Keyspaces. You can also use a similar pipeline to replicate the data back to the Cassandra cluster from Amazon Keyspaces to maintain data consistency, if needed. Here you can find the documents and code to help accelerate your migration to Amazon Keyspaces.


About the Authors

Nikolai Kolesnikov is a Senior Data Architect and helps AWS Professional Services customers build highly-scalable applications using Amazon Keyspaces. He also leads Amazon Keyspaces ProServe customer engagements.

Kunal Gautam is a Senior Big Data Architect at Amazon Web Services. Having experience in building his own Startup and working along with enterprises, he brings a unique perspective to get people, business and technology work in tandem for customers. He is passionate about helping customers in their digital transformation journey and enables them to build scalable data and advance analytics solutions to gain timely insights and make critical business decisions. In his spare time, Kunal enjoys Marathons, Tech Meetups and Meditation retreats.

How GE Proficy Manufacturing Data Cloud replatformed to improve TCO, data SLA, and performance

Post Syndicated from Jyothin Madari original https://aws.amazon.com/blogs/big-data/how-ge-proficy-manufacturing-data-cloud-replatformed-to-improve-tco-data-sla-and-performance/

This is post is co-authored by Jyothin Madari, Madhusudhan Muppagowni and Ayush Srivastava from GE.

GE Proficy Manufacturing Data Cloud (MDC), part of the GE Digital’s Manufacturing Execution Systems (MES) suite of solutions, allows GED’s customers to increase the derived value easily and quickly from the MES by reliably bringing enterprise-wide manufacturing data into the cloud and transforming it into a structured dataset for advanced analytics and deeper insights into the manufacturing processes.

In this post, we share how MDC modernized the hybrid cloud strategy by replatforming. This solution improved scalability, their data availability Service Level Agreement (SLA), and performance.

Challenge

MDC v1 was built on Predix services using industrial use case-optimized Predix services such as Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR). MDC evolved in both features and the underlying platform over the past year with a goal to improve TCO, data SLA, and performance. MDC’s customer base grew and the number of sites from customers grew to over 100 in the past couple of years. The increased number of sites needed more compute and storage capacity. This increased infrastructure and operational cost significantly, while introducing increased data latency and lowering the data freshness interval from the cloud.

How we started

MDC evaluated several vendors for their storage and compute capabilities using various measurements: security, performance, scalability, ease of management and operation, reduction of overall cost and increase in ROI, partnership, and migration help (technology assistance). The MDC team saw opportunities to improve the product by using native AWS services such as Amazon Redshift, AWS Glue, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA), which made the product more performant and scalable while reducing operation costs and making it future-ready for advanced analytics and new customer use cases.

The GE Digital team, comprised of domain experts, developers, and QA, worked shoulder to shoulder with the AWS ProServe team, comprised of Solution Architects, Data Architects, and Big Data Experts, in determining the key architectural changes required and solutions to implementation challenges.

Overview of solution

The following diagram illustrates the high-level architecture of the solution.

This is a broad overview, and the specifics of networking and security between components are out of scope for this post.

The solution includes the following main steps and components:

  1. CDC and log collector – Compressed CSV data is collected from over 100 Manufacturing Data Sources Proficy Plant Applications and sinked into an Amazon Simple Storage Service (Amazon S3) bucket.
  2. S3 raw bucket – Our data lands in Amazon S3 without any transformation, but appropriately partitioned (tenant, site, date, and so on) for the ease of future processing.
  3. AWS Lambda – When the file lands in the S3 raw bucket, it triggers an S3 event notification, which invokes AWS Lambda. Lambda extracts metadata (bucket name, key name, date, and so on) from the event and saves it in Amazon DynamoDB.
  4. AWS Glue – Our goal is now to take CSV files, with varying schemas, and convert them into Apache Parquet format. An AWS Glue extract, transform, and load (ETL) job reads a list of files to be processed from the DynamoDB table and fetches them from the S3 raw bucket. We have preconfigured unified AVRO schemas in the AWS Glue Schema Registry for schema conversion. Converted data lands in the S3 raw Parquet bucket.
  5. S3 raw Parquet bucket – Data in this bucket is still raw and unmodified; only the format was changed. This intermediary storage is required due to schema and column order mismatch in CSV files.
  6. Amazon Redshift – The majority of transformations and data enrichment happens in this step. Amazon Redshift Spectrum consumes data from the S3 raw Parquet bucket and external PostgreSQL dimension tables (through a federated query). Transformations are performed via stored procedures, where we encapsulate logic for data transformation, data validation, and business-specific logic. The Amazon Redshift cluster is configured with concurrency scaling, auto workload management (WLM) with caching, and the latest RA3 instance types.
  7. MDC API – These custom-built, web-based, REST API microservices talk on the backend with Amazon Redshift and expose data to external users, business intelligence (BI) tools, and partners.
  8. Amazon Redshift data export and archival – On a scheduled basis, Amazon Redshift exports (UNLOAD command) contextualized and business-defined aggregated data. Exports are landed in the S3 bucket as Apache Parquet files.
  9. S3 Parquet export bucket – This bucket stores the exported data (hundreds of TBs) used by external users who need to run extensive, heavy analytics and AI or machine learning (ML) with various tools (such as Amazon EMR, Amazon Athena, Apache Spark, and Dremio).
  10. End-users – External users consume data from the API. The main use case here is reporting and visual analytics.
  11. Amazon MWAA – The orchestrator of the solution, Amazon MWAA is used for scheduling Amazon Redshift stored procedures, AWS Glue ETL jobs, and Amazon Redshift exports at regular intervals with error handling and retries built in.

Bringing it all together

MDC replaced both Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR) with Amazon Redshift for both storage of the MDC data models and compute (ELT). Amazon MWAA is used to schedule the workloads that do the bulk of the ELT. Lambda, AWS Glue, and DynamoDB are used to normalize the schema differences between sites. It was important not to disrupt MDC customers while replatforming. To achieve this, MDC used a phased approach to migrate the data models to Amazon Redshift. They used federated queries to query existing PostgreSQL for dimensional data, which facilitated having some of the data models in Amazon Redshift, while the others were in Cassandra with no interruption to MDC customers. Redshift Spectrum facilitated querying the raw data in Amazon S3 directly both for ETL and data validation.

75% of the MDC team along with the AWS ProServe team and AWS Solution Architects collaborated with the GE Digital Security Team and Platform Team to implement the architecture with AWS native services. It took approximately 9 months to implement, secure, and performance tune the architecture and migrate data models in three phases. Each phase has gone through a GE Digital internal security review. Amazon Redshift Auto WLM, short query acceleration, and tuning the sort keys to optimize querying patterns improved the Proficy MDC API performance. Because the unload of the data from Amazon Redshift was fast, Proficy MDC is now able to export the data much more frequently to our end customers.

Conclusion

With replatforming, Proficy MDC was able to improve ETL performance by approximately 75%. Data latency and freshness improved by approximately 87%. The solution reduced TCO of the platform by approximately 50%. Proficy MDC was also able reduce the infrastructure and operational cost. Improved performance and reduced latency has allowed us to speed up the next steps in our journey to modernize the enterprise data architecture and hybrid cloud data platform.


About the Authors

Jyothin Madari leads the Manufacturing Data Cloud (MDC) engineering team; part of the manufacturing suite of products at GE Digital. He has 18 years of experience, 4 of which is with GE Digital. Most recently he has been working on data migration projects with an aim to reduce costs and improve performance. He is an AWS Certified Cloud Practitioner, a keen learner and loves solving interesting problems. Connect with him on LinkedIn.

Madhusudhan (Madhu) Muppagowni is a Technical Architect and Principal Software Developer based in Silicon Valley, Bay Area, California.  He is passionate about Software Development and Architecture. He thrives on producing Well-Architected and Secure SaaS Products, Data Pipelines that can make a real impact.  He loves outdoors and an avid hiker and backpacker. Connect with him on LinkedIn.

Ayush Srivastava is a Senior Staff Engineer and Technical Anchor based in Hyderabad, India. He is passionate about Software Development and Architecture. He has Demonstrated track record of successfully technical anchoring small to large Secure SaaS Products, Data Pipelines from start to finish. He loves exploring different places and he says “I’m in love with cities I have never been to and people I have never met.” Connect with him on LinkedIn.

Karen Grygoryan is Data Architect with AWS ProServe. Connect with him on LinkedIn.

Gnanasekaran Kailasam is a Data Architect at AWS. He has worked with building data warehouses and big data solutions for over 16 years. He loves to learn new technologies and solving, automating, and simplifying customer problems with easy-to-use cloud data solutions on AWS. Connect with him on LinkedIn.

Supercharging Dream11’s Data Highway with Amazon Redshift RA3 clusters

Post Syndicated from Dhanraj Gaikwad original https://aws.amazon.com/blogs/big-data/supercharging-dream11s-data-highway-with-amazon-redshift-ra3-clusters/

This is a guest post by Dhanraj Gaikwad, Principal Engineer on Dream11 Data Engineering team.

Dream11 is the world’s largest fantasy sports platform, with over 120 million users playing fantasy cricket, football, kabaddi, basketball, hockey, volleyball, handball, rugby, futsal, American football, and baseball. Dream11 is the flagship brand of Dream Sports, India’s leading Sports Technology company, and has partnerships with several national and international sports bodies and cricketers.

In this post, we look at how we supercharged our data highway, the backbone of our major analytics pipeline, by migrating our Amazon Redshift clusters to RA3 nodes. We also look at why we were excited about this migration, the challenges we faced during the migration and how we overcame them, as well as the benefits accrued from the migration.

Background

The Dream11 Data Engineering team runs the analytics pipelines (what we call our Data Highway) across Dream Sports. In near-real time, we analyze various aspects that directly impact the end-user experience, which can have a profound business impact for Dream11.

Initially, we were analyzing upwards of terabytes of data per day with Amazon Redshift clusters that ran mainly on dc2.8xlarge nodes. However, due to a rapid increase in our user participation over the last few years, we observed that our data volumes increased multi-fold. Because we were using dc2.8xlarge clusters, this meant adding more nodes of dc2.8xlarge instance types to the Amazon Redshift clusters. Not only was this increasing our costs, it also meant that we were adding additional compute power when what we really needed was more storage. Because we anticipated significant growth during the Indian Premier League (IPL) 2021, we actively explored various options using our AWS Enterprise Support team. Additionally, we were expecting more data volume over the next few years.

The solution

After discussions with AWS experts and the Amazon Redshift product team, we at Dream11 were recommended the most viable option of migrating our Amazon Redshift clusters from dc2.8xlarge to the newer RA3 nodes. The most obvious reason for this was the decoupled storage from compute. As a result, we could use lesser nodes and move our storage to Amazon Redshift managed storage. This allowed us to respond to data volume growth in the coming years as well as reduce our costs.

To start off, we conducted a few elementary tests using an Amazon Redshift RA3 test cluster. After we were convinced that this wouldn’t require many changes in our Amazon Redshift queries, we decided to carry out a complete head-to-head performance test between the two clusters.

Validating the solution

Because the user traffic on the Dream11 app tends to spike during big ticket tournaments like the IPL, we wanted to ensure that the RA3 clusters could handle the same traffic that we usually experience during our peak. The AWS Enterprise Support team suggested using the Simple Replay tool, an open-sourced tool released by AWS that you can use to record and replay the queries from one Amazon Redshift cluster to another. This tool allows you to capture queries on a source Amazon Redshift cluster, and then replay the same queries on a destination Amazon Redshift cluster (or clusters). We decided to use this tool to capture our performance test queries on the existing dc2.8xlarge clusters and replay them on a test Amazon Redshift cluster composed of RA3 nodes. During this time of our experimentation, the newer version of the automated AWS CloudFormation-based toolset (now on GitHub), was not available.

Challenges faced

The first challenge came up when using the Simple Replay tool because there was no easy way to compare the performance of like-to-like queries on the two types of clusters. Although Amazon Redshift provides various statistics using meta-tables about individual queries and their performance, the Simple Replay tool adds additional comments in each Amazon Redshift query on the target cluster to make it easier to know if these queries were run by the Simple Replay tool. In addition, the Simple Replay tool drops comments from the queries on the source cluster.

Comparing each query performance with the Amazon Redshift performance test suite would mean writing additional scripts for easy performance comparison. An alternative would have been to modify the Simple Replay tool code, because it’s open source on GitHub. However, with the IPL 2022 beginning in just a few days, we had to explore another option urgently.

After further discussions with the AWS Enterprise Support team, we decided to use two test clusters: one with the old dc2.8xlarge nodes, and another with the newer RA3 nodes. The idea was to use the Simple Replay tool to run the captured queries from our original cluster on both test clusters. This meant that the queries would be identical on both test clusters, making it easier to compare. Although this meant running an additional test cluster for a few days, we went ahead with this option. As a side note, the newer automated AWS CloudFormation-based toolset does exactly the same in an automated way.

After we were convinced that most of our Amazon Redshift queries performed satisfactorily, we noticed that certain queries were performing slower on the RA3-based cluster than the dc2.8xlarge cluster. We narrowed down the problem to SQL queries with full table scans. We rectified it by following proper data modelling practices in the ETL workflow. Then we were ready to migrate to the newer RA3 nodes.

The migration to RA3

The migration from the old cluster to the new cluster was smoother than we thought. We used the elastic resize approach, which meant we only had a few minutes of Amazon Redshift downtime. We completed the migration successfully with a sufficient buffer timeline for more tests. Additional tests indicated that the new cluster performed how we wanted it to.

The trial by fire

The new cluster performed satisfactorily during our peak performance loads in the IPL as well as the following ICC T20 Cricket World Cup. We’re excited that the new RA3 node-based Amazon Redshift cluster can support our data volume growth needs without needing to increase the number of instance nodes.

We migrated from dc2 to RA3 in April 2021. The data volume has grown by 50% since then. If we had continued with dc2 instances, the cluster cost would have increased by 50%. However, because of the migration to RA3 instances, even with an increase in data volume by 50% since April 2021, the cluster cost has increased by 0.7%, which is attributed to an increase in storage cost.

Conclusion

Migrating to the newer RA3-based Amazon Redshift cluster helped us decouple our computing needs from our storage needs, and now we’re prepared for our expected data volume growth for the next few years. Moreover, we don’t need to add compute nodes if we only need storage, which is expected to bring down our costs in the long run. We did need to fine-tune some of our queries on the newer cluster. With the Simple Replay tool, we could do a direct comparison between the older and the newer cluster. You can also use the newer automated AWS CloudFormation-based toolset if you want to follow a similar approach.

We highly recommend RA3 instances. They give you the flexibility to size your RA3 cluster based on the
amount of data stored without increasing your compute costs.


About the Authors

Dhanraj Gaikwad is a Principal Data Engineer at Dream11. Dhanraj has more than 15 years of experience in the field of data and analytics. In his current role, Dhanraj is responsible for building the data platform for Dream Sports and is specialized in data warehousing, including data modeling, building data pipelines, and query optimizations. He is passionate about solving large-scale data problems and taking unique approaches to deal with them.

Sanket Raut is a Principal Technical Account Manager at AWS based in Vasai ,India. Sanket has more than 16 years of industry experience, including roles in cloud architecture, systems engineering, and software design. He currently focuses on enabling large startups to streamline their cloud operations and optimize their cloud spend. His area of interest is in serverless technologies.

How Paytm modernized their data pipeline using Amazon EMR

Post Syndicated from Rajat Bhardwaj original https://aws.amazon.com/blogs/big-data/how-paytm-modernized-their-data-pipeline-using-amazon-emr/

This post was co-written by Rajat Bhardwaj, Senior Technical Account Manager at AWS and Kunal Upadhyay, General Manager at Paytm.

Paytm is India’s leading payment platform, pioneering the digital payment era in India with 130 million active users. Paytm operates multiple lines of business, including banking, digital payments, bill recharges, e-wallet, stocks, insurance, lending and mobile gaming. At Paytm, the Central Data Platform team is responsible for turning disparate data from multiple business units into insights and actions for their executive management and merchants, who are small, medium or large business entities accepting payments from the Paytm platforms.

The Data Platform team modernized their legacy data pipeline with AWS services. The data pipeline collects data from different sources and runs analytical jobs, generating approximately 250K reports per day, which are consumed by Paytm executives and merchants. The legacy data pipeline was set up on premises using a proprietary solution and didn’t utilize the open-source Hadoop stack components such as Spark or Hive. This legacy setup was resource-intensive, having high CPU and I/O requirements. Analytical jobs took approximately 8–10 hours to complete, which often led to Service Level Agreements (SLA) breaches. The legacy solution was also prone to outages due to higher than expected hardware resource consumption. Its hardware and software limitations impacted the ability of the system to scale during peak load. Data models used in the legacy setup processed the entire data every time, which led to an increased processing time.

In this post, we demonstrate how the Paytm Central Data Platform team migrated their data pipeline to AWS and modernized it using Amazon EMR, Amazon Simple Storage Service (Amazon S3) and underlying AWS Cloud infrastructure along with Apache Spark. We optimized the hardware usage and reduced the data analytical processing, resulting in shorter turnaround time to generate insightful reports, all while maintaining operational stability and scale irrespective of the size of daily ingested data.

Overview of solution

The key to modernizing a data pipeline is to adopt an optimal incremental approach, which helps reduce the end-to-end cycle to analyze the data and get meaningful insights from it. To achieve this state, it’s vital to ingest incremental data in the pipeline, process delta records and reduce the analytical processing time. We configured the data sources to inherit the unchanged records and tuned the Spark jobs to only analyze the newly inserted or updated records. We used temporal data columns to store the incremental datasets until they’re processed. Data intensive Spark jobs are configured in incremental on-demand deduplicating mode to process the data. This helps to eliminate redundant data tuples from the data lake and reduces the total data volume, which saves compute and storage capacity. We also optimized the scanning of raw tables to restrict the scans to only the changed record set which reduced scanning time by approximately 90%. Incremental data processing also helps to reduce the total processing time.

At the time of this writing, the existing data pipeline has been operationally stable for 2 years. Although this modernization was vital, there is a risk of an operational outage while the changes are being implemented. Data skewing needs to be handled in the new system by an appropriate scaling strategy. Zero downtime is expected from the stakeholders because the reports generated from this system are vital for Paytm’s CXO, executive management and merchants on a daily basis.

The following diagram illustrates the data pipeline architecture.

Benefits of the solution

The Paytm Central Data Office team, comprised of 10 engineers, worked with the AWS team to modernize the data pipeline. The team worked for approximately 4 months to complete this modernization and migration project.

Modernizing the data pipeline with Amazon EMR 6.3 helped efficiently scale the system at a lower cost. Amazon EMR managed scaling helped reduce the scale-in and scale-out time and increase the usage of Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances for running the Spark jobs. Paytm is now able to utilize a Spot to On-Demand ratio of 80:20, resulting in higher cost savings. Amazon EMR managed scaling also helped automatically scale the EMR cluster based on YARN memory usage with the desired type of EC2 instances. This approach eliminates the need to configure multiple Amazon EMR scaling policies tied to specific types of EC2 instances as per the compute requirements for running the Spark jobs.

In the following sections, we walk through the key tasks to modernize the data pipeline.

Migrate over 400 TB of data from the legacy storage to Amazon S3

Paytm team built a proprietary data migration application with the open-source AWS SDK for Java for Amazon S3 using the Scala programming language. This application can connect with multiple cloud providers , on-premises data centers and migrate the data to a central data lake built on Amazon S3.

Modernize the transformation jobs for over 40 data flows

Data flows are defined in the system for ingesting raw data, preprocessing the data and aggregating the data that is used by the analytical jobs for report generation. Data flows are developed using Scala programming language on Apache Spark. We use an Azkaban batch workflow job scheduler for ordering and tracking the Spark job runs. Workflows are created on Amazon EMR to schedule these Spark jobs multiple times during a day. We also implemented Spark optimizations to improve the operational efficiency for these jobs. We use Spark broadcast joins to handle the data skewness, which can otherwise lead to data spillage, resulting in extra storage needs. We also tuned the Spark jobs to avoid a  large number of small files, which is a known problem with Spark if not handled effectively. This is mainly because Spark is a parallel processing system and data loading is done through multiple tasks where each task can load into multiple partition. Data-intensive jobs are run using Spark stages.

The following is the code snippet for the Scala jobs:

nodes:
  - name: jobC
    type: noop
    # jobC depends on jobA and jobB
    dependsOn:
      - jobA
      - jobB

  - name: jobA
    type: command
    config:
      command: echo "This is an echoed text."

  - name: jobB
    type: command
    config:
      command: pwd

Validate the data

Accuracy of the data reports is vital for the modern data pipeline. The modernized pipeline has additional data reconciliation steps to improve the correctness of data across the platform. This is achieved by having greater programmatic control over the processed data. We could only reconcile data for the legacy pipeline after the entire data processing was complete. However, the modern data pipeline enables all the transactions to be reconciled at every step of the transaction, which gives granular control for data validation. It also helps isolate the cause of any data processing errors. Automated tests were done before go-live to compare the data records generated by the legacy vs. the modern system to ensure data sanity. These steps helped ensure the overall sanity of the processed data by the new system. Deduplication of data is done frequently via on-demand queries to eliminate redundant data, thereby reducing the processing time. As an example, if there are transactions which are already consumed by the end clients but still a part of the data-set, these can be eliminated by the deduplication, resulting in processing of only the newer transactions for the end client consumption.

The following sample query uses Spark SQL for on-demand deduplication of raw data at the reporting layer:

Insert over table  <<table>>
select col1,col2,col3 ---...coln 
from (select t.*
            ,row_number() over(order by col) as rn 
      from <<table>>
     ) t
where rn = 1

What we achieved as part of the modernization

With the new data pipeline, we reduced the compute infrastructure by 400% which helps to save  compute cost. The earlier legacy stack was running on over 6,000 virtual cores. Optimization techniques helped to run the same system at an improved scale, with approximately 1,500 virtual cores. We are able to reduce the compute and storage capacity for 400 TB of data and 40 data flows after migrating to Amazon EMR. We also achieved Spark optimizations, which helped to reduce the runtime of the jobs by 95% (from 8–10 hours to 20–30 minutes), CPU consumption by 95%, I/O by 98% and overall computation time by 80%. The incremental data processing approach helped to scale the system despite data skewness, which wasn’t the case with the legacy solution.

Conclusion

In this post, we showed how Paytm modernized their data lake and data pipeline using Amazon EMR, Amazon S3, underlying AWS Cloud infrastructure and Apache Spark. Choice of these cloud & big-data technologies helped to address the challenges for operating a big data pipeline because the type and volume of data from disparate sources adds complexity to the analytical processing.

By partnering with AWS, the Paytm Central Data Platform team created a modern data pipeline in a short amount of time. It provides reduced data analytical times with astute scaling capabilities, generating high-quality reports for the executive management and merchants on a daily basis.

As next steps, do a deep dive bifurcating the data collection and data processing stages for your data pipeline system. Each stage of the data pipeline should be appropriately designed and scaled to reduce the processing time while maintaining integrity of the reports generated as an output.

If you have feedback about this post, submit comments in the Comments section below.


About the Authors

Rajat Bhardwaj is a Senior Technical Manager with Amazon Web Services based in India, having 23 years of work experience with multiple roles in software development, telecom, and cloud technologies. He works along with AWS Enterprise customers, providing advocacy and strategic technical guidance to help plan and build solutions using AWS services and best practices. Rajat is an avid runner, having competed several half and full marathons in recent years.

Kunal Upadhyay is a General Manager with Paytm Central Data Platform team based out of Bengaluru, India. Kunal has 16 years of experience in big data, distributed computing, and data intelligence. When not building software, Kunal enjoys travel and exploring the world, spending time with friends and family.

Use Amazon Kinesis Data Firehose to extract data insights with Coralogix

Post Syndicated from Tal Knopf original https://aws.amazon.com/blogs/big-data/use-amazon-kinesis-data-firehose-to-extract-data-insights-with-coralogix/

This is a guest blog post co-written by Tal Knopf at Coralogix.

Digital data is expanding exponentially, and the existing limitations to store and analyze it are constantly being challenged and overcome. According to Moore’s Law, digital storage becomes larger, cheaper, and faster with each successive year. The advent of cloud databases is just one example of how this is happening. Previous hard limits on storage size have become obsolete since their introduction.

In recent years, the amount of available data storage in the world has increased rapidly, reflecting this new reality. If you took all the information just from US academic research libraries and lumped it together, it would add up to 2 petabytes.

Coralogix has worked with AWS to bring you a solution to allow for the flawless integration of high volumes of data with the Coralogix platform for analysis, using Amazon Kinesis Data Firehose.

Kinesis Data Firehose and Coralogix

Kinesis Data Firehose delivers real-time streaming data to destinations like Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon OpenSearch Service (successor to Amazon Elasticsearch Service), and now supports delivering streaming data to Coralogix. There is no limit on the number of delivery streams, so you can use it to get data from multiple AWS services.

Kinesis Data Firehose provides built-in, fully managed error handling, transformation, conversion, aggregation, and compression functionality, so you don’t need to write applications to handle these complexities.

Coralogix is an AWS Partner Network (APN) Advanced Technology Partner with AWS DevOps Competency. The platform enables you to easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure. You can analyze all your AWS service logs while storing only the ones you need, and generate metrics from aggregated logs to uncover and alert on trends in your AWS services.

Solution overview

Imagine a pipe flowing with data—messages, to be more specific. These messages can contain log lines, metrics, or any other type of data you want to collect.

Clearly, there must be something pushing data into the pipe; this is the provider. There must also be a mechanism for pulling data out of the pipe; this is the consumer.

Kinesis Data Firehose makes it easy to collect, process, and analyze real-time, streaming data by grouping the pipes together in the most efficient way to help with management and scaling.

It offers a few significant benefits compared to other solutions:

  • Keeps monitoring simple – With this solution, you can configure AWS Web Application Firewall (AWS WAF), Amazon Route 53 Resolver Query Logs, or Amazon API Gateway to deliver log events directly to Kinesis Data Firehose.
  • Integrates flawlessly – Most AWS services use Amazon CloudWatch by default to collect logs, metrics, and additional events data. CloudWatch logs can easily be sent using the Firehose delivery stream.
  • Flexible with minimum maintenance – To configure Kinesis Data Firehose with the Coralogix API as a destination, you only need to set up the authentication in one place, regardless of the amount of services or integrations providing the actual data. You can also configure an S3 bucket as a backup plan. You can back up all log events or only those exceeding a specified retry duration.
  • Scale, scale, scale – Kinesis Data Firehose scales up to meet your needs with no need for you to maintain it. The Coralogix platform is also built for scale and can meet all your monitoring needs as your system grows.

Prerequisites

To get started, you must have the following:

  • A Coralogix account. If you don’t already have an account, you can sign up for one.
  • A Coralogix private key.

To find your key, in your Coralogix account, choose API Keys on the Data Flow menu.

Locate the key for Send Your Data.

Set up your delivery stream

To configure your deliver stream, complete the following steps:

  1. On the Kinesis Data Firehose console, choose Create delivery stream.
  2. Under Choose source and destination, for Source, choose Direct PUT.
  3. For Destination, choose Coralogix.
  4. For Delivery stream name¸ enter a name for your stream.
  5. Under Destination settings, for HTTP endpoint name, enter a name for your endpoint.
  6. For HTTP endpoint URL, enter your endpoint URL based on your Region and Coralogix account configuration.
  7. For Access key, enter your Coralogix private key.
  8. For Content encoding¸ select GZIP.
  9. For Retry duration, enter 60.

To override the logs applicationName, subsystemName, or computerName, complete the optional steps under Parameters.

  1. For Key, enter the log name.
  2. For Value, enter a new value to override the default.
  3. For this post, leave the configuration under Buffer hints as is.
  4. In the Backup settings section, for Source record in Amazon S3, select Failed data only (recommended).
  5. For S3 backup bucket, choose an existing bucket or create a new one.
  6. Leave the settings under Advanced settings as is.
  7. Review your settings and choose Create delivery stream.

Logs subscribed to your delivery stream are immediately sent and available for analysis within Coralogix.

Conclusion

Coralogix provides you with full visibility into your logs, metrics, tracing, and security data without relying on indexing to provide analysis and insights. When you use Kinesis Data Firehose to send data to Coralogix, you can easily centralize all your AWS service data for streamlined analysis and troubleshooting.

To get the most out of the platform, check out Getting Started with Coralogix, which provides information on everything from parsing and enrichment to alerting and data clustering.


About the Authors

Tal Knopf is the Head of Customer DevOps at Coralogix. He uses his vast experience in designing and building customer-focused solutions to help users extract the full value from their observability data. Previously, he was a DevOps engineer in Akamai and other companies, where he specialized in large-scale systems and CDN solutions.

Ilya Rabinov is a Solutions Architect at AWS. He works with ISVs at late stages of their journey to help build new products, migrate existing applications, or optimize workloads on AWS. His ares of interest include machine learning, artificial intelligence, security, DevOps culture, CI/CD, and containers.

How SailPoint solved scaling issues by migrating legacy big data applications to Amazon EMR on Amazon EKS

Post Syndicated from Richard Li original https://aws.amazon.com/blogs/big-data/how-sailpoint-solved-scaling-issues-by-migrating-legacy-big-data-applications-to-amazon-emr-on-amazon-eks/

This post is co-written with Richard Li from SailPoint.

SailPoint Technologies is an identity security company based in Austin, TX. Its software as a service (SaaS) solutions support identity governance operations in regulated industries such as healthcare, government, and higher education. SailPoint distinguishes multiple aspects of identity as individual identity security services, including cloud governance, SaaS management, access risk governance, file access management, password management, provisioning, recommendations, and separation of duties, as well as access certification, access insights, access modeling, and access requests.

In this post, we share how SailPoint updated its platform for big data operations, and solved scaling issues by migrating legacy big data applications to Amazon EMR on Amazon EKS.

The challenge with the legacy data environment

SailPoint acquired a SaaS software platform that processes and analyzes identity, resource, and usage data from multiple cloud providers, and provides access insights, usage analysis, and access risk analysis. The original design criteria of the platform was focused on serving small to medium-sized companies. To quickly process these analytics insights, many of these processing workloads were done inside many microservices through streaming connections.

After acquisition, we set a goal to expand the platform’s capability to handle customers with large cloud footprints over multiple cloud providers, sometime over hundreds or even thousands of accounts producing large amount of cloud event data.

The legacy architecture has a simplistic approach for data processing, as shown in the following diagram. We were processing the vast majority of event data in-service and directly ingested into Amazon Relational Database Service (Amazon RDS), which we then merged with a graph database to form the final view..

We needed to convert this into a scalable process that could handle customers of any size. To address this challenge, we had to quickly introduce a big data processing engine in the platform.

How migrating to Amazon EMR on EKS helped solve this challenge

When evaluating the platform for our big data operations, several factors made Amazon EMR on EKS a top choice.

The amount of event data we receive at any given time is generally unpredictable. To stay cost-effective and efficient, we need a platform that is capable of scaling up automatically when the workload increases to reduce wait time, and can scale down when the capacity is no longer needed to save cost. Because our existing application workloads are already running on an Amazon Elastic Kubernetes Service (Amazon EKS) cluster with the cluster autoscaler enabled, running Amazon EMR on EKS on top of our existing EKS cluster fits this need.

Amazon EMR on EKS can safely coexist on an EKS cluster that is already hosting other workloads, be contained within a specified namespace, and have controlled access through use of Kubernetes role-based access control and AWS Identity and Access Management (IAM) roles for service accounts. Therefore, we didn’t have to build new infrastructures just for Amazon EMR. We simply linked up Amazon EMR on EKS with our existing EKS cluster running our application workloads. This reduced the amount of DevOps support needed, and significantly sped up our implementation and deployment timeline.

Unlike Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), because our EKS cluster spans over multiple Availability Zones, we can control Spark pods placements using Kubernetes’s pod scheduling and placement strategy to achieve higher fault tolerance.

With the ability to create and use custom images in Amazon EMR on EKS, we could also utilize our existing container-based application build and deployment pipeline for our Amazon EMR on EKS workload without any modifications. This also gave us additional benefit in reducing job startup time because we package all job scripts as well as all dependencies with the image, without having to fetch them at runtime.

We also utilize AWS Step Functions as our core workflow engine. The native integration of Amazon EMR on EKS with Step Functions is another bonus where we didn’t have to build custom code for job dispatch. Instead, we could utilize the Step Functions native integration to seamlessly integrate Amazon EMR jobs with our existing workflow, with very little effort.

In merely 5 months, we were able to go from design, to proof of concept, to rolling out phase 1 of the event analytics processing. This vastly improved our event analytics processing capability by extending horizontal scalability, which gave us the ability to take customers with significantly larger cloud footprints than the legacy platform was designed for.

During the development and rollout of the platform, we also found that the Spark History Server provided by Amazon EMR on EKS was very useful in terms of helping us identify performance issues and tune the performance of our jobs.

As of this writing, the phase 1 rollout, which includes the event processing component of the core analytics processing, is complete. We’re now expanding the platform to migrate additional components onto Amazon EMR on EKS. The following diagram depicts our future architecture with Amazon EMR on EKS when all phases are complete.

In addition, to improve performances and reduce costs, we’re currently testing the Spark dynamic resource allocation support of Amazon EMR on EKS. This would automatically scale up and down the job executors based on load, and therefore boost performance when needed and reduce cost when the workload is low. Furthermore, we’re investigating the possibility to reduce the overall cost and increase performance by utilizing the pod template feature that would allow us to seamlessly transition our Amazon EMR job workload to AWS Graviton based instances.

Conclusion

With Amazon EMR on EKS, we can now onboard new customers and process vast amounts of data in a cost-effective manner, which we couldn’t do with our legacy environment. We plan to expand our Amazon EMR on EKS footprint to handle all our transform and load data analytics processes.


About the Authors

Richard Li is a senior staff software engineer on the SailPoint Technologies Cloud Access Management team.

Janak Agarwal is a product manager for Amazon EMR on Amazon EKS at AWS.

Kiran Guduguntla is a WW Go-to-Market Specialist for Amazon EMR at AWS. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data analytics solutions.

How GE Aviation automated engine wash analytics with AWS Glue using a serverless architecture

Post Syndicated from Giridhar G Jorapur original https://aws.amazon.com/blogs/big-data/how-ge-aviation-automated-engine-wash-analytics-with-aws-glue-using-a-serverless-architecture/

This post is authored by Giridhar G Jorapur, GE Aviation Digital Technology.

Maintenance and overhauling of aircraft engines are essential for GE Aviation to increase time on wing gains and reduce shop visit costs. Engine wash analytics provide visibility into the significant time on wing gains that can be achieved through effective water wash, foam wash, and other tools. This empowers GE Aviation with digital insights that help optimize water and foam wash procedures and maximize fuel savings.

This post demonstrates how we automated our engine wash analytics process to handle the complexity of ingesting data from multiple data sources and how we selected the right programming paradigm to reduce the overall time of the analytics job. Prior to automation, analytics jobs took approximately 2 days to complete and ran only on an as-needed basis. In this post, we learn how to process large-scale data using AWS Glue and by integrating with other AWS services such as AWS Lambda and Amazon EventBridge. We also discuss how to achieve optimal AWS Glue job performance by applying various techniques.

Challenges

When we considered automating and developing the engine wash analytics process, we observed the following challenges:

  • Multiple data sources – The analytics process requires data from different sources such as foam wash events from IoT systems, flight parameters, and engine utilization data from a data lake hosted in an AWS account.
  • Large dataset processing and complex calculations – We needed to run analytics for seven commercial product lines. One of the product lines has approximately 280 million records, which is growing at a rate of 30% year over year. We needed analytics to run against 1 million wash events and perform over 2,000 calculations, while processing approximately 430 million flight records.
  • Scalable framework to accommodate new product lines and calculations – Due to the dynamics of the use case, we needed an extensible framework to add or remove new or existing product lines without affecting the existing process.
  • High performance and availability – We needed to run analytics daily to reflect the latest updates in engine wash events and changes in flight parameter data.
  • Security and compliance – Because the analytics processes involve flight and engine-related data, the data distribution and access need to adhere to the stringent security and compliance regulations of the aviation industry.

Solution overview

The following diagram illustrates the architecture of our wash analytics solution using AWS services.

The solution includes the following components:

  • EventBridge (1) – We use an EventBridge (time-based) to schedule the daily process to capture the delta changes between the runs.
  • Lambda (2a) – Lambda orchestrates the AWS Glue jobs initiation, backup, and recovery on failure for each stage, utilizing EventBridge (event-based) for the alerting of these events.
  • Lambda (2b) – Foam cart events from IoT devices are loaded into staging buckets in Amazon Simple Storage Service (Amazon S3) daily.
  • AWS Glue (3) – The wash analytics need to handle a small subset of data daily, but the initial historical load and transformation is huge. Because AWS Glue is serverless, it’s easy to set up and run with no maintenance.
    • Copy job (3a) – We use an AWS Glue copy job to copy only the required subset of data from across AWS accounts by connecting to AWS Glue Data Catalog tables using a cross-account AWS Identity and Access Management (IAM) role.
    • Business transformation jobs (3b, 3c) – When the copy job is complete, Lambda triggers subsequent AWS Glue jobs. Because our jobs are both compute and memory intensive, we use G2.x worker nodes. We can use Amazon CloudWatch metrics to fine-tune our jobs to use the right worker nodes. To handle complex calculations, we split large jobs up into multiple jobs by pipelining the output of one job as input to another job.
  • Source S3 buckets (4a) – Flights, wash events, and other engine parameter data is available in source buckets in a different AWS account exposed via Data Catalog tables.
  • Stage S3 bucket (4b) – Data from another AWS account is required for calculations, and all the intermediate outputs from the AWS Glue jobs are written to the staging bucket.
  • Backup S3 bucket (4c) – Every day before starting the AWS Glue job, the previous day’s output from the output bucket is backed up in the backup bucket. In case of any job failure, the data is recovered from this bucket.
  • Output S3 bucket (4d) – The final output from the AWS Glue jobs is written to the output bucket.

Continuing our analysis of the architecture components, we also use the following:

  • AWS Glue Data Catalog tables (5) – We catalog flights, wash events, and other engine parameter data using Data Catalog tables, which are accessed by AWS Glue copy jobs from another AWS account.
  • EventBridge (6) – We use EventBridge (event-based) to monitor for AWS Glue job state changes (SUCEEDED, FAILED, TIMEOUT, and STOPPED) and orchestrate the workflow, including backup, recovery, and job status notifications.
  • IAM role (7) – We set up cross-account IAM roles to copy the data from one account to another from the AWS Glue Data Catalog tables.
  • CloudWatch metrics (8) – You can monitor many different CloudWatch metrics. The following metrics can help you decide on horizontal or vertical scaling when fine-tuning the AWS Glue jobs:
    • CPU load of the driver and executors
    • Memory profile of the driver
    • ETL data movement
    • Data shuffle across executors
    • Job run metrics, including active executors, completed stages, and maximum needed executors
  • Amazon SNS (9) Amazon Simple Notification Service (Amazon SNS) automatically sends notifications to the support group on the error status of jobs, so they can take corrective action upon failure.
  • Amazon RDS (10) – The final transformed data is stored in Amazon Relational Database Service (Amazon RDS) for PostgreSQL (in addition to Amazon S3) to support legacy reporting tools.
  • Web application (11) – A web application is hosted on AWS Elastic Beanstalk, and is enabled with Auto Scaling exposed for clients to access the analytics data.

Implementation strategy

Implementing our solution included the following considerations:

  • Security – The data required for running analytics is present in different data sources and different AWS accounts. We needed to craft well-thought-out role-based access policies for accessing the data.
  • Selecting the right programming paradigm – PySpark does lazy evaluation while working with data frames. For PySpark to work efficiently with AWS Glue, we created data frames with required columns upfront and performed column-wise operations.
  • Choosing the right persistence storage – Writing to Amazon S3 enables multiple consumption patterns, and writes are much faster due to parallelism.

If we’re writing to Amazon RDS (for supporting legacy systems), we need to watch out for database connectivity and buffer lock issues while writing from AWS Glue jobs.

  • Data partitioning – Identifying the right key combination is important for partitioning the data for Spark to perform optimally. Our initial runs (without data partition) with 30 workers of type G2.x took 2 hours and 4 minutes to complete.

The following screenshot shows our CloudWatch metrics.

After a few dry runs, we were able to arrive at partitioning by a specific column (df.repartition(columnKey)) and with 24 workers of type G2.x, the job completed in 2 hours and 7 minutes. The following screenshot shows our new metrics.

We can observe a difference in CPU and memory utilization—running with even fewer nodes shows a smaller CPU utilization and memory footprint.

The following table shows how we achieved the final transformation with the strategies we discussed.

Iteration Run Time AWS Glue Job Status Strategy
1 ~12 hours Unsuccessful/Stopped Initial iteration
2 ~9 hours Unsuccessful/Stopped Changing code to PySpark methodology
3 5 hours, 11 minutes Partial success Splitting a complex large job into multiple jobs
4 3 hours, 33 minutes Success Partitioning by column key
5 2 hours, 39 minutes Success Changing CSV to Parquet file format while storing the copied data from another AWS account and intermediate results in the stage S3 bucket
6 2 hours, 9 minutes Success Infra scaling: horizontal and vertical scaling

Conclusion

In this post, we saw how to build a cost-effective, maintenance-free solution using serverless AWS services to process large-scale data. We also learned how to gain optimal AWS Glue job performance with key partitioning, using the Parquet data format while persisting in Amazon S3, splitting complex jobs into multiple jobs, and using the right programming paradigm.

As we continue to solidify our data lake solution for data from various sources, we can use Amazon Redshift Spectrum to serve various future analytical use cases.


About the Authors

Giridhar G Jorapur is a Staff Infrastructure Architect at GE Aviation. In this role, he is responsible for designing enterprise applications, migration and modernization of applications to the cloud. Apart from work, Giri enjoys investing himself in spiritual wellness. Connect him on LinkedIn.

How ENGIE scales their data ingestion pipelines using Amazon MWAA

Post Syndicated from Anouar Zaaber original https://aws.amazon.com/blogs/big-data/how-engie-scales-their-data-ingestion-pipelines-using-amazon-mwaa/

ENGIE—one of the largest utility providers in France and a global player in the zero-carbon energy transition—produces, transports, and deals electricity, gas, and energy services. With 160,000 employees worldwide, ENGIE is a decentralized organization and operates 25 business units with a high level of delegation and empowerment. ENGIE’s decentralized global customer base had accumulated lots of data, and it required a smarter, unique approach and solution to align its initiatives and provide data that is ingestible, organizable, governable, sharable, and actionable across its global business units.

In 2018, the company’s business leadership decided to accelerate its digital transformation through data and innovation by becoming a data-driven company. Yves Le Gélard, chief digital officer at ENGIE, explains the company’s purpose: “Sustainability for ENGIE is the alpha and the omega of everything. This is our raison d’être. We help large corporations and the biggest cities on earth in their attempts to transition to zero carbon as quickly as possible because it is actually the number one question for humanity today.”

ENGIE, as with any other big enterprise, is using multiple extract, transform, and load (ETL) tools to ingest data into their data lake on AWS. Nevertheless, they usually have expensive licensing plans. “The company needed a uniform method of collecting and analyzing data to help customers manage their value chains,” says Gregory Wolowiec, the Chief Technology Officer who leads ENGIE’s data program. ENGIE wanted a free-license application, well integrated with multiple technologies and with a continuous integration, continuous delivery (CI/CD) pipeline to more easily scale all their ingestion process.

ENGIE started using Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to solve this issue and started moving various data sources from on-premise applications and ERPs, AWS services like Amazon Redshift, Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, external services like Salesforce, and other cloud providers to a centralized data lake on top of Amazon Simple Storage Service (Amazon S3).

Amazon MWAA is used in particular to collect and store harmonized operational and corporate data from different on-premises and software as a service (SaaS) data sources into a centralized data lake. The purpose of this data lake is to create a “group performance cockpit” that enables an efficient, data-driven analysis and thoughtful decision-making by the Engie Management board.

In this post, we share how ENGIE created a CI/CD pipeline for an Amazon MWAA project template using an AWS CodeCommit repository and plugged it into AWS CodePipeline to build, test, and package the code and custom plugins. In this use case, we developed a custom plugin to ingest data from Salesforce based on the Airflow Salesforce open-source plugin.

Solution overview

The following diagrams illustrate the solution architecture defining the implemented Amazon MWAA environment and its associated pipelines. It also describes the customer use case for Salesforce data ingestion into Amazon S3.

The following diagram shows the architecture of the deployed Amazon MWAA environment and the implemented pipelines.

The preceding architecture is fully deployed via infrastructure as code (IaC). The implementation includes the following:

  • Amazon MWAA environment – A customizable Amazon MWAA environment packaged with plugins and requirements and configured in a secure manner.
  • Provisioning pipeline – The admin team can manage the Amazon MWAA environment using the included CI/CD provisioning pipeline. This pipeline includes a CodeCommit repository plugged into CodePipeline to continuously update the environment and its plugins and requirements.
  • Project pipeline – This CI/CD pipeline comes with a CodeCommit repository that triggers CodePipeline to continuously build, test and deploy DAGs developed by users. Once deployed, these DAGs are made available in the Amazon MWAA environment.

The following diagram shows the data ingestion workflow, which includes the following steps:

  1. The DAG is triggered by Amazon MWAA manually or based on a schedule.
  2. Amazon MWAA initiates data collection parameters and calculates batches.
  3. Amazon MWAA distributes processing tasks among its workers.
  4. Data is retrieved from Salesforce in batches.
  5. Amazon MWAA assumes an AWS Identity and Access Management (IAM) role with the necessary permissions to store the collected data into the target S3 bucket.

This AWS Cloud Development Kit (AWS CDK) construct is implemented with the following security best practices:

  • With the principle of least privilege, you grant permissions to only the resources or actions that users need to perform tasks.
  • S3 buckets are deployed with security compliance rules: encryption, versioning, and blocking public access.
  • Authentication and authorization management is handled with AWS Single Sign-On (AWS SSO).
  • Airflow stores connections to external sources in a secure manner either in Airflow’s default secrets backend or an alternative secrets backend such as AWS Secrets Manager or AWS Systems Manager Parameter Store.

For this post, we step through a use case using the data from Salesforce to ingest it into an ENGIE data lake in order to transform it and build business reports.

Prerequisites for deployment

For this walkthrough, the following are prerequisites:

  • Basic knowledge of the Linux operating system
  • Access to an AWS account with administrator or power user (or equivalent) IAM role policies attached
  • Access to a shell environment or optionally with AWS CloudShell

Deploy the solution

To deploy and run the solution, complete the following steps:

  1. Install AWS CDK.
  2. Bootstrap your AWS account.
  3. Define your AWS CDK environment variables.
  4. Deploy the stack.

Install AWS CDK

The described solution is fully deployed with AWS CDK.

AWS CDK is an open-source software development framework to model and provision your cloud application resources using familiar programming languages. If you want to familiarize yourself with AWS CDK, the AWS CDK Workshop is a great place to start.

Install AWS CDK using the following commands:

npm install -g aws-cdk
# To check the installation
cdk --version

Bootstrap your AWS account

First, you need to make sure the environment where you’re planning to deploy the solution to has been bootstrapped. You only need to do this one time per environment where you want to deploy AWS CDK applications. If you’re unsure whether your environment has been bootstrapped already, you can always run the command again:

cdk bootstrap aws://YOUR_ACCOUNT_ID/YOUR_REGION

Define your AWS CDK environment variables

On Linux or MacOS, define your environment variables with the following code:

export CDK_DEFAULT_ACCOUNT=YOUR_ACCOUNT_ID
export CDK_DEFAULT_REGION=YOUR_REGION

On Windows, use the following code:

setx CDK_DEFAULT_ACCOUNT YOUR_ACCOUNT_ID
setx CDK_DEFAULT_REGION YOUR_REGION

Deploy the stack

By default, the stack deploys a basic Amazon MWAA environment with the associated pipelines described previously. It creates a new VPC in order to host the Amazon MWAA resources.

The stack can be customized using the parameters listed in the following table.

To pass a parameter to the construct, you can use the AWS CDK runtime context. If you intend to customize your environment with multiple parameters, we recommend using the cdk.json context file with version control to avoid unexpected changes to your deployments. Throughout our example, we pass only one parameter to the construct. Therefore, for the simplicity of the tutorial, we use the the --context or -c option to the cdk command, as in the following example:

cdk deploy -c paramName=paramValue -c paramName=paramValue ...
Parameter Description Default Valid values
vpcId VPC ID where the cluster is deployed. If none, creates a new one and needs the parameter cidr in that case. None VPC ID
cidr The CIDR for the VPC that is created to host Amazon MWAA resources. Used only if the vpcId is not defined. 172.31.0.0/16 IP CIDR
subnetIds Comma-separated list of subnets IDs where the cluster is deployed. If none, looks for private subnets in the same Availability Zone. None Subnet IDs list (coma separated)
envName Amazon MWAA environment name MwaaEnvironment String
envTags Amazon MWAA environment tags None See the following JSON example: '{"Environment":"MyEnv", "Application":"MyApp", "Reason":"Airflow"}'
environmentClass Amazon MWAA environment class mw1.small mw1.small, mw1.medium, mw1.large
maxWorkers Amazon MWAA maximum workers 1 int
webserverAccessMode Amazon MWAA environment access mode (private or public) PUBLIC_ONLY PUBLIC_ONLY, PRIVATE_ONLY
secretsBackend Amazon MWAA environment secrets backend Airflow SecretsManager

Clone the GitHub repository:

git clone https://github.com/aws-samples/cdk-amazon-mwaa-cicd

Deploy the stack using the following command:

cd mwaairflow && \
pip install . && \
cdk synth && \
cdk deploy -c vpcId=YOUR_VPC_ID

The following screenshot shows the stack deployment:

The following screenshot shows the deployed stack:

Create solution resources

For this walkthrough, you should have the following prerequisites:

If you don’t have a Salesforce account, you can create a SalesForce developer account:

  1. Sign up for a developer account.
  2. Copy the host from the email that you receive.
  3. Log in into your new Salesforce account
  4. Choose the profile icon, then Settings.
  5. Choose Reset my Security Token.
  6. Check your email and copy the security token that you receive.

After you complete these prerequisites, you’re ready to create the following resources:

  • An S3 bucket for Salesforce output data
  • An IAM role and IAM policy to write the Salesforce output data on Amazon S3
  • A Salesforce connection on the Airflow UI to be able to read from Salesforce
  • An AWS connection on the Airflow UI to be able to write on Amazon S3
  • An Airflow variable on the Airflow UI to store the name of the target S3 bucket

Create an S3 bucket for Salesforce output data

To create an output S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Create bucket.

The Create bucket wizard opens.

  1. For Bucket name, enter a DNS-compliant name for your bucket, such as airflow-blog-post.
  2. For Region, choose the Region where you deployed your Amazon MWAA environment, for example, US East (N. Virginia) us-east-1.
  3. Choose Create bucket.

For more information, see Creating a bucket.

Create an IAM role and IAM policy to write the Salesforce output data on Amazon S3

In this step, we create an IAM policy that allows Amazon MWAA to write on your S3 bucket.

  1. On the IAM console, in the navigation pane, choose Policies.
  2. Choose Create policy.
  3. Choose the JSON tab.
  4. Enter the following JSON policy document, and replace airflow-blog-post with your bucket name:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": ["arn:aws:s3:::airflow-blog-post"]
        },
        {
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject",
            "s3:DeleteObject"
          ],
          "Resource": ["arn:aws:s3:::airflow-blog-post/*"]
        }
      ]
    }

  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, choose a name for your policy (for example, airflow_data_output_policy).
  8. Choose Create policy.

Let’s attach the IAM policy to a new IAM role that we use in our Airflow connections.

  1. On the IAM console, choose Roles in the navigation pane and then choose Create role.
  2. In the Or select a service to view its use cases section, choose S3.
  3. For Select your use case, choose S3.
  4. Search for the name of the IAM policy that we created in the previous step (airflow_data_output_role) and select the policy.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Role name, choose a name for your role (airflow_data_output_role).
  8. Review the role and then choose Create role.

You’re redirected to the Roles section.

  1. In the search box, enter the name of the role that you created and choose it.
  2. Copy the role ARN to use later to create the AWS connection on Airflow.

Create a Salesforce connection on the Airflow UI to be able to read from Salesforce

To read data from Salesforce, we need to create a connection using the Airflow user interface.

  1. On the Airflow UI, choose Admin.
  2. Choose Connections, and then the plus sign to create a new connection.
  3. Fill in the fields with the required information.

The following table provides more information about each value.

Field Mandatory Description Values
Conn Id Yes Connection ID to define and to be used later in the DAG For example, salesforce_connection
Conn Type Yes Connection type HTTP
Host Yes Salesforce host name host-dev-ed.my.salesforce.com or host.lightning.force.com. Replace the host with your Salesforce host and don’t add the http:// as prefix.
Login Yes The Salesforce user name. The user must have read access to the salesforce objects. [email protected]
Password Yes The corresponding password for the defined user. MyPassword123
Port No Salesforce instance port. By default, 443. 443
Extra Yes Specify the extra parameters (as a JSON dictionary) that can be used in the Salesforce connection. security_token is the Salesforce security token for authentication. To get the Salesforce security token in your email, you must reset your security token. {"security_token":"AbCdE..."}

Create an AWS connection in the Airflow UI to be able to write on Amazon S3

An AWS connection is required to upload data into Amazon S3, so we need to create a connection using the Airflow user interface.

  1. On the Airflow UI, choose Admin.
  2. Choose Connections, and then choose the plus sign to create a new connection.
  3. Fill in the fields with the required information.

The following table provides more information about the fields.

Field Mandatory Description Value
Conn Id Yes Connection ID to define and to be used later in the DAG For example, aws_connection
Conn Type Yes Connection type Amazon Web Services
Extra Yes It is required to specify the Region. You also need to provide the role ARN that we created earlier.
{
"region":"eu-west-1",
"role_arn":"arn:aws:iam::123456789101:role/airflow_data_output_role "
}

Create an Airflow variable on the Airflow UI to store the name of the target S3 bucket

We create a variable to set the name of the target S3 bucket. This variable is used by the DAG. So, we need to create a variable using the Airflow user interface.

  1. On the Airflow UI, choose Admin.
  2. Choose Variables, then choose the plus sign to create a new variable.
  3. For Key, enter bucket_name.
  4. For Val, enter the name of the S3 bucket that you created in a previous step (airflow-blog-post).

Create and deploy a DAG in Amazon MWAA

To be able to ingest data from Salesforce into Amazon S3, we need to create a DAG (Directed Acyclic Graph). To create and deploy the DAG, complete the following steps:

  1. Create a local Python DAG.
  2. Deploy your DAG using the project CI/CD pipeline.
  3. Run your DAG on the Airflow UI.
  4. Display your data in Amazon S3 (with S3 Select).

Create a local Python DAG

The provided SalesForceToS3Operator allows you to ingest data from Salesforce objects to an S3 bucket. Refer to standard Salesforce objects for the full list of objects you can ingest data from with this Airflow operator.

In this use case, we ingest data from the Opportunity Salesforce object. We retrieve the last 6 months’ data in monthly batches and we filter on a specific list of fields.

The DAG provided in the sample in GitHub repository imports the last 6 months of the Opportunity object (one file by month) by filtering the list of retrieved fields.

This operator takes two connections as parameters:

  • An AWS connection that is used to upload ingested data into Amazon S3.
  • A Salesforce connection to read data from Salesforce.

The following table provides more information about the parameters.

Parameter Type Mandatory Description
sf_conn_id string Yes Name of the Airflow connection that has the following information:

  • user name
  • password
  • security token
sf_obj string Yes Name of the relevant Salesforce object (Account, Lead, Opportunity)
s3_conn_id string Yes The destination S3 connection ID
s3_bucket string Yes The destination S3 bucket
s3_key string Yes The destination S3 key
sf_fields string No The (optional) list of fields that you want to get from the object (Id, Name, and so on).
If none (the default), then this gets all fields for the object.
fmt string No The (optional) format that the S3 key of the data should be in.
Possible values include CSV (default), JSON, and NDJSON.
from_date date format No A specific date-time (optional) formatted input to run queries from for incremental ingestion.
Evaluated against the SystemModStamp attribute.
Not compatible with the query parameter and should be in date-time format (for example, 2021-01-01T00:00:00Z).
Default: None
to_date date format No A specific date-time (optional) formatted input to run queries to for incremental ingestion.
Evaluated against the SystemModStamp attribute.
Not compatible with the query parameter and should be in date-time format (for example, 2021-01-01T00:00:00Z).
Default: None
query string No A specific query (optional) to run for the given object.
This overrides default query creation.
Default: None
relationship_object string No Some queries require relationship objects to work, and these are not the same names as the Salesforce object.
Specify that relationship object here (optional).
Default: None
record_time_added boolean No Set this optional value to true if you want to add a Unix timestamp field to the resulting data that marks when the data was fetched from Salesforce.
Default: False
coerce_to_timestamp boolean No Set this optional value to true if you want to convert all fields with dates and datetimes into Unix timestamp (UTC).
Default: False

The first step is to import the operator in your DAG:

from operators.salesforce_to_s3_operator import SalesforceToS3Operator

Then define your DAG default ARGs, which you can use for your common task parameters:

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': '[email protected]',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'sf_conn_id': 'salesforce_connection',
    's3_conn_id': 'aws_connection',
    's3_bucket': 'salesforce-to-s3',
}
...

Finally, you define the tasks to use the operator.

The following examples illustrate some use cases.

Salesforce object full ingestion

This task ingests all the content of the Salesforce object defined in sf_obj. This selects all the object’s available fields and writes them into the defined format in fmt. See the following code:

...
salesforce_to_s3 = SalesforceToS3Operator(
    task_id="Opportunity_to_S3",
    sf_conn_id=default_args["sf_conn_id"],
    sf_obj="Opportunity",
    fmt="ndjson",
    s3_conn_id=default_args["s3_conn_id"],
    s3_bucket=default_args["s3_bucket"],
    s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json",
    dag=salesforce_to_s3_dag,
)
...

Salesforce object partial ingestion based on fields

This task ingests specific fields of the Salesforce object defined in sf_obj. The selected fields are defined in the optional sf_fields parameter. See the following code:

...
salesforce_to_s3 = SalesforceToS3Operator(
    task_id="Opportunity_to_S3",
    sf_conn_id=default_args["sf_conn_id"],
    sf_obj="Opportunity",
    sf_fields=["Id","Name","Amount"],
    fmt="ndjson",
    s3_conn_id=default_args["s3_conn_id"],
    s3_bucket=default_args["s3_bucket"],
    s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json",
    dag=salesforce_to_s3_dag,
)
...

Salesforce object partial ingestion based on time period

This task ingests all the fields of the Salesforce object defined in sf_obj. The time period can be relative using from_date or to_date parameters or absolute by using both parameters.

The following example illustrates relative ingestion from the defined date:

...
salesforce_to_s3 = SalesforceToS3Operator(
    task_id="Opportunity_to_S3",
    sf_conn_id=default_args["sf_conn_id"],
    sf_obj="Opportunity",
    from_date="YESTERDAY",
    fmt="ndjson",
    s3_conn_id=default_args["s3_conn_id"],
    s3_bucket=default_args["s3_bucket"],
    s3_key=f"salesforce/raw/dt={s3_prefix}/{table.lower()}.json",
    dag=salesforce_to_s3_dag,
)
...

The from_date and to_date parameters support Salesforce date-time format. It can be either a specific date or literal (for example TODAY, LAST_WEEK, LAST_N_DAYS:5). For more information about date formats, see Date Formats and Date Literals.

For the full DAG, refer to the sample in GitHub repository.

This code dynamically generates tasks that run queries to retrieve the data of the Opportunity object in the form of 1-month batches.

The sf_fields parameter allows us to extract only the selected fields from the object.

Save the DAG locally as salesforce_to_s3.py.

Deploy your DAG using the project CI/CD pipeline

As part of the CDK deployment, a CodeCommit repository and CodePipeline pipeline were created in order to continuously build, test, and deploy DAGs into your Amazon MWAA environment.

To deploy the new DAG, the source code should be committed to the CodeCommit repository. This triggers a CodePipeline run that builds, tests, and deploys your new DAG and makes it available in your Amazon MWAA environment.

  1. Sign in to the CodeCommit console in your deployment Region.
  2. Under Source, choose Repositories.

You should see a new repository mwaaproject.

  1. Push your new DAG in the mwaaproject repository under dags. You can either use the CodeCommit console or the Git command line to do so:
    1. CodeCommit console:
      1. Choose the project CodeCommit repository name mwaaproject and navigate under dags.
      2. Choose Add file and then Upload file and upload your new DAG.
    2. Git command line:
      1. To be able to clone and access your CodeCommit project with the Git command line, make sure Git client is properly configured. Refer to Setting up for AWS CodeCommit.
      2. Clone the repository with the following command after replacing <region> with your project Region:
        git clone https://git-codecommit.<region>.amazonaws.com/v1/repos/mwaaproject

      3. Copy the DAG file under dags and add it with the command:
        git add dags/salesforce_to_s3.py

      4. Commit your new file with a message:
        git commit -m "add salesforce DAG"

      5. Push the local file to the CodeCommit repository:
        git push

The new commit triggers a new pipeline that builds, tests, and deploys the new DAG. You can monitor the pipeline on the CodePipeline console.

  1. On the CodePipeline console, choose Pipeline in the navigation pane.
  2. On the Pipelines page, you should see mwaaproject-pipeline.
  3. Choose the pipeline to display its details.

After checking that the pipeline run is successful, you can verify that the DAG is deployed to the S3 bucket and therefore available on the Amazon MWAA console.

  1. On the Amazon S3 console, look for a bucket starting with mwaairflowstack-mwaaenvstackne and go under dags.

You should see the new DAG.

  1. On the Amazon MWAA console, choose DAGs.

You should be able to see the new DAG.

Run your DAG on the Airflow UI

Go to the Airflow UI and toggle on the DAG.

This triggers your DAG automatically.

Later, you can continue manually triggering it by choosing the run icon.

Choose the DAG and Graph View to see the run of your DAG.

If you have any issue, you can check the logs of the failed tasks from the task instance context menu.

Display your data in Amazon S3 (with S3 Select)

To display your data, complete the following steps:

  1. On the Amazon S3 console, in the Buckets list, choose the name of the bucket that contains the output of the Salesforce data (airflow-blog-post).
  2. In the Objects list, choose the name of the folder that has the object that you copied from Salesforce (opportunity).
  3. Choose the raw folder and the dt folder with the latest timestamp.
  4. Select any file.
  5. On the Actions menu, choose Query with S3 Select.
  6. Choose Run SQL query to preview the data.

Clean up

To avoid incurring future charges, delete the AWS CloudFormation stack and the resources that you deployed as part of this post.

  1. On the AWS CloudFormation console, delete the stack MWAAirflowStack.

To clean up the deployed resources using the AWS Command Line Interface (AWS CLI), you can simply run the following command:

cdk destroy MWAAirflowStack

Make sure you are in the root path of the project when you run the command.

After confirming that you want to destroy the CloudFormation stack, the solution’s resources are deleted from your AWS account.

The following screenshot shows the process of deploying the stack:

The following screenshot confirms the stack is undeployed.

  1. Navigate to the Amazon S3 console and locate the two buckets containing mwaairflowstack-mwaaenvstack and mwaairflowstack-mwaaproj that were created during the deployment.
  2. Select each bucket delete its contents, then delete the bucket.
  3. Delete the IAM role created to write on the S3 buckets.

Conclusion

ENGIE discovered significant value by using Amazon MWAA, enabling its global business units to ingest data in more productive ways. This post presented how ENGIE scaled their data ingestion pipelines using Amazon MWAA. The first part of the post described the architecture components and how to successfully deploy a CI/CD pipeline for an Amazon MWAA project template using a CodeCommit repository and plug it into CodePipeline to build, test, and package the code and custom plugins. The second part walked you through the steps to automate the ingestion process from Salesforce using Airflow with an example. For the Airflow configuration, you used Airflow variables, but you can also use Secrets Manager with Amazon MWAA using the secretsBackend parameter when deploying the stack.

The use case discussed in this post is just one example of how you can use Amazon MWAA to make it easier to set up and operate end-to-end data pipelines in the cloud at scale. For more information about Amazon MWAA, check out the User Guide.


About the Authors

Anouar Zaaber is a Senior Engagement Manager in AWS Professional Services. He leads internal AWS, external partner, and customer teams to deliver AWS cloud services that enable the customers to realize their business outcomes.

Amine El Mallem is a Data/ML Ops Engineer in AWS Professional Services. He works with customers to design, automate, and build solutions on AWS for their business needs.

Armando Segnini is a Data Architect with AWS Professional Services. He spends his time building scalable big data and analytics solutions for AWS Enterprise and Strategic customers. Armando also loves to travel with his family all around the world and take pictures of the places he visits.

Mohamed-Ali Elouaer is a DevOps Consultant with AWS Professional Services. He is part of the AWS ProServe team, helping enterprise customers solve complex problems related to automation, security, and monitoring using AWS services. In his free time, he likes to travel and watch movies.

Julien Grinsztajn is an Architect at ENGIE. He is part of the Digital & IT Consulting ENGIE IT team working on the definition of the architecture for complex projects related to data integration and network security. In his free time, he likes to travel the oceans to meet sharks and other marine creatures.

Monitoring Juniper Mist wireless network

Post Syndicated from Brian van Baekel original https://blog.zabbix.com/monitoring-juniper-mist-wireless-network/19093/

As Premium Zabbix partner, Opensource ICT Solutions is building Zabbix solutions all over the world. That means we have customers with a broad variety of requirements, thoughts on how to monitor things, which metrics are important and how to alert upon it. If one of those customers approaches us with a question concerning a task the likes of which we have never done before, it’s a challenge. And we love challenges! This blog post will cover one such challenge that we solved some time ago.

Quanza is a leading infrastructure operator offering a broad portfolio of services to completely take over the management of networks, data centers and cloud services. With more than 70 colleagues and at least as many specializations, everyone at Quanza works towards the same goal: designing, building, and operating an optimal IT infrastructure. Exactly like you would expect it… and then some. Quanza understands that you prefer to focus on your own innovation. By continuously mapping out your wishes, Quanza provides customized solutions that keep your network up and running 24×7. Today and in the future.

With a relentless focus on mission-critical environments, often of relevance to society, Quanza has an impressive line-up of customers. Some enterprises that chose to partner up with Quanza are SURF, Payvision, the Volksbank, and the Amsterdam Internet Exchange (AMS-IX), one of the world’s largest internet hubs.

Recently, customers started asking Quanza to embed Juniper MIST products for wired and wireless networks in their service portfolio. In order to fully support the network’s lifecycle (build, operate and innovate), the Juniper MIST products will need to be monitored by their 24×7 NOC. This is where we came into play, with our Zabbix knowledge.

We quickly decided to combine the knowledge Quanza has of the Juniper MIST equipment and API and our Zabbix knowledge to build the best possible monitoring solution.

SNMP or cloud?

The Juniper MIST solution is a cloud-based solution that provides a single pane of management for Juniper Networks products. As it’s cloud-based, it’s not a “traditional” network solution. As such, SNMP is not an option for device monitoring as they are communicating only with “the cloud” and we cannot access them directly like we used to do with traditional network equipment.

So, we started to investigate other options. One of the most common options right now is talking to some sort of API and pulling the metrics from that API. With Zabbix “HTTP agent” item key, this is no problem at all. Unfortunately, that’s not how the MIST API works. It’s pushing data instead of letting you pull it (actually, it does – but this doesn’t scale at all). Now, the Zabbix HTTP Agent item type allows trapping, but only in a specific Zabbix sender format. Of course, the MIST API does not allow that.

This means we have a problem. SNMP is not available. Pulling data is not a viable, scalable option. Pushing the data is an option, but Zabbix does not understand that.

Since we are not talking about some sort of proprietary monitoring tool which is completely closed and way too static, there is always a solution with Zabbix as long as you’re creative enough.

Getting data into Zabbix

We needed some middleware. Something that was able to receive that data from MIST and convert it into something that we can push into Zabbix.

That’s exactly what we did. We, together with Quanza, built a middleware that uses an API token to authenticate against the MIST API endpoint. Once the authentication is successful, the middleware is allowed to subscribe to certain “channels”. These channels provide event and performance data. You can compare it with MQTT, where a subscription to channels/topics is needed to get the information you are interested in.

Mist Middleware explained

  • Step 1:  Authenticate using an API token.
  • Step 2: Subscribe to channels
  • Step 3: Receive performance and event data
  • Step 4: Filter out only the relevant (performance) data for Zabbix
  • Step 5: Push into Zabbix

Once we had this in place, the MIST part was finished. We had our data and were able to push it into the monitoring solution.

Parsing in Zabbix

So, right now we have the data available for Zabbix. Time to find a neat way to use it. As the environments (both inventory and the types of equipment that are used) might be dynamic, we definitely do not want to apply any manual work to monitor newly added sites/equipment.

That means that low-level discovery rules are pretty much the only viable solution.

Here we go:

Describing host prototypes

 

 

Within Zabbix, we configure 1 host (the Discovery host) and apply a template on that host, with exactly 1 LLD rule: Query our middleware, and based on the information received, create new hosts (Host prototypes).

The data that is received looks like this:

{
"NODEID":"<NODEID>",
"NAME":"AP-<SITE>-<NUMBER",
"SITENAME":"<SITENAME>",
"SITEID":"<SITEID>",
"MAC":"<MAC ADDRESS>",
"ORGNAME":"<ORG NAME>"
},

Those new/discovered hosts will have the names of the AP and corresponding organization and location (in Mist: site). We also link a template to the discovered host and add it to a Host group with the variables we’ll need later, such as the organization, site name, siteID etc.

So, We need to parse those JSON elements. Luckily Zabbix provides, within the LLD rule config the option to parse this into LLD macros, so for example the Node id is parsed into {#ID} with the use of JSONPath $.NODEID:

LLD macro configuration

Once this process is complete, we have a new host per AP. Of course, there is no data on that host and querying the middleware or Mist is a bad idea. Scalability will be extremely problematic with more than a few organizations and sites configured in the Mist environment. As we’re building this with a big network integrator, scalability is a thing and we do not want to risk having a noticeable performance impact by using polling.

How about pushing data from the middleware into Zabbix? Once the data is received from Mist by the middleware, it’s parsed, filtered and then it pushes out whatever must be pushed out to Zabbix. We decided the best option is to push per host as we have those already available in Zabbix.

Now we should ensure two things:

    • do not overwhelm Zabbix with data being pushed in
    • Getting all the data with the least number of ‘pushes’ into Zabbix

Again, the flexibility of Zabbix is extremely useful here. On the AP hosts, there is a template with exactly 1 trapper item: receive performance data. From there, everything will be handled by the Zabbix ‘Master/Dependent’ item concept. We then extract data like temperatures, CPU load, memory usage, etc.

At the same time, we receive data regarding network usage (interface statistics) and radio information. As we do not know upfront how many network interfaces and radio’s there are on a particular Access Point, we do not want to hard-code such information. Here we are combining the concept of low-level discovery with dependent items (The following blog post covers the logic behind such an approach: Low-Level Discovery with Dependent items – Zabbix Blog)

Using ‘low-level discovery with dependent Items’, all relevant items are created ‘dynamically’ in such a way that a change on the MIST side (for example a new type of Access Point) doesn’t require changes on the Zabbix side. Monitoring starts within minutes and you’ll never miss any problem that might arise!
Just to give you an idea of the flow:
The Master Item gets a JSON format like this (and we’ve parsed only a small portion here) pushed into it from the middleware:

{
"mac":"<MAC ADDRESS>",
"model":"<MODEL>",
"port_stat":{
"eth0":{
"up":true,
"speed":1000,
"full_duplex":true,
"tx_bytes":37291245,
"tx_pkts":169443,
"rx_bytes":123742282,
"rx_pkts":779249,
"rx_errors":0,
"rx_peak_bps":14184,
"tx_peak_bps":5444
}
},
"cpu_util":2,
"cpu_user":652611,
"cpu_system":901455,
"radio_stat":{
"band_5":{
"num_clients":<CLIENTS>,
"channel":<CHANNEL>,
"bandwidth":0,
"power":0,
"tx_bytes":0,
"tx_pkts":0,
"rx_bytes":0,
"rx_pkts":0,
"noise_floor":<NOISE>,
"disabled":true,
"usage":"5",
"util_all":0,
"util_tx":0,
"util_rx_in_bss":0,
"util_rx_other_bss":0,
"util_unknown_wifi":0,
"util_non_wifi":0
}
"env_stat":{
"cpu_temp":<CPU TEMP>,
"ambient_temp":<AMBIENT TEMP>,
"humidity":0,
"attitude":0,
"pressure":0
}
}

Within the Master item, we’re basically not parsing anything, it’s just there to receive the values and push them into the Dependent items. In the dependent items, we start “cherry-picking” only those metrics that we would like to see. As it’s JSON format, preprocessing step “JSONPath” comes in handy. At the same time, we’re looking into efficiency, so a second step is added: discard unchanged with heartbeat (1d):

Example: Getting out the statistics of the 2.4Ghz band radio:

Item prototype proprocessing

Of course, this has to be done with all items.

So far, we’ve heavily focussed on the technical part, but Zabbix does have quite a few options to visualize the data as well. As we’re waiting on the next LTS release, we have only set up a very small dashboard with a few widgets. One of the better ones:

number_clients

Here we’re using the new graph type widget, but instead of plotting the number of clients per AP, we’re plotting a dataset with an “aggregate” function. Of course, if we look at the dashboard widgets, there are many more things that can be visualized…

Efficiency and security considerations

As we were building this, we had 2 main considerations:

    • Efficiency
    • Security

Efficiency, as we are anticipating that Quanza will be responsible for quite a few MIST environments on top of the current environments in the near future, combined with a strict limit of allowed API calls against the MIST API. As such, it is really important to keep those API calls as low as possible. Next to that, with every new Access Point added, the load on the Zabbix server is increasing. Now that is not really a problem, as Zabbix is perfectly capable of monitoring thousands of metrics simultaneously, though it has its limits. And you do not want to hit those limits in a production environment with the only solution being migration to beefier hardware.

Security-wise this challenge had a few things going on since we’re talking to an external exposed API. MIST can invoke webhooks. This might’ve been a bit easier (we explored it, but there were of course other things to keep in mind while going down that road), but the main concern was the requirement that Zabbix / an interface to Zabbix is exposed to the internet. That didn’t look too appealing and required a bit more maintenance. The preferable solution was to create that middleware where we have full control of what queries are executed, how the API token is protected, which connections are established etc. etc.

Conclusion

Although this question was challenging, together with Quanza we created a scalable, secure, and dynamic solution. Zabbix is flexible enough to facilitate the tricks required to provide reliable monitoring and alerting in an efficient and secure manner. We strongly believe the only limitation is your own creativity and this case proves that once again.

Quanza can now ensure the availability of their customer Juniper MIST-based networks, and in case something breaks their 24×7 manned NOC will be able to take whatever action is required to ensure the availability of the customers’ network – all thanks to the flexibility of Zabbix.

The post Monitoring Juniper Mist wireless network appeared first on Zabbix Blog.

ZABBIX – Open-Source Monitoring Software for Automotive Monitoring

Post Syndicated from Dmitry Lambert original https://blog.zabbix.com/zabbix-open-source-monitoring-software-for-automotive-monitoring/18776/

In this article, I will try to cover the theoretical models on monitoring your vehicle fleet with minimal to no cost at all by using the ELM327 microcontroller, a python library to process the collected data and a Zabbix proxy running on a small Raspberry Pi device to store and sent the collected metrics to the central Zabbix server.

Expanding the scope of our Zabbix instance

The first thing that comes to mind when someone mentions a monitoring system is pretty simple. People think about server monitoring, and with servers, we usually mean Linux and Windows systems and also network monitoring for all kinds of flavors of switches, routers, firewalls, etc. But by putting so much focus on these standard things, we are someway limiting the possibilities of monitoring systems. Zabbix has proven itself as an extremely powerful monitoring tool that can combine and monitor all client infrastructure – no matter if we are talking about the aforementioned servers, network devices, services, applications, or anything else. And most important – Zabbix is truly a 100% open-source product, which allows anyone to use all listed functionalities for free.

Please, keep in mind that no doubt there are systems available that are created exactly for the same purpose I will cover here. Maybe they are more reliable. Perhaps they require less effort to achieve the desired result. But that is the exact reason why the presented model is mostly theoretical, with the primary goal being to show that it is hard to put Zabbix in some functionality boundaries. Usually, the only limitation is our imagination. And it is up to you to treat this information for pure entertainment or try to implement it in a place where you find it suitable.

Monitoring a car fleet

Let’s get straight to the point. You don’t need to own a huge logistics company with a thousand vehicle fleet to understand it. In simple words – if you or any of your relatives own a car, you should be aware that cars tend to break. Just like it usually happens, there could be many types of issues, starting with a flat tire and ending with some ongoing damage in the gearbox or engine. It is important to understand that vehicles themselves are becoming smarter. If in the past it was a purely mechanical device, then nowadays it is a highly complicated electronic system on top of that mechanical device that can diagnose the slightest deviations from accepted norms that are set by the manufacturer and inform about this malfunction either with an indicator light on your dash or simply with a log message that will be accessible only when read with specialized software or tools.

No alt text provided for this image

 

Keep in mind that malfunctions in vehicles are not as simple as boolean ( works or not ). In most cases an issue is noticed before the car is not able to move forward, and the purpose of that is to be informed and fix the issue before it has turned into a defect that actually prevents the vehicle from functioning.

And now think about this from an automotive business perspective. We may be talking about hundreds of vehicles that are always on the move to deliver something or someone in time. It should be straightforward that in such a niche business, each of these vehicles should be able to traverse close to a thousand kilometers per day.

No alt text provided for this image

 

Thankfully, as mentioned previously, the smart diagnostic system will let you know about all the potential problems. On the other hand, the driver of the vehicle usually has nothing to do with its repairs or technical condition. So in a perfect world, we should have a few technical employees that would simply ask returning driver whether everything is fine with the car after his shift – are there any errors, and connect with diagnostic software to read its logs to make sure that everything actually is ok. If it’s not ok, information should be passed to the technical department to move this vehicle to the maintenance.

Why such pressure? Well, remember that most of these notifications serve as a warning that something is not working as it should, but currently, it is not causing harm. However, if it is ignored, there is a high chance that at some point, the vehicle will not be able to continue its way to the destination.

No alt text provided for this image

 

So this is where Zabbix joins the conversation. Imagine if all this data transfer from the vehicle diagnostic system to the responsible employees would happen automatically, with potential error prioritization and escalation to further levels if any vehicle has an ongoing issue that remains active for multiple days. And remember that Zabbix is a completely free and open-source system, which means that we could achieve this result for free. And we are absolutely not limited to DTC ( Diagnostic Trouble Codes ) readings. Combining this ecosystem with the recent Zabbix 6.0 LTS release, we can create a geomap with the current location of any vehicle from our fleet. With a little effort, we can also get speed measurements, long stops, starts, and much more.

This is the part when the tested but still theoretical model comes into action. By now, we are aware that a car is way smarter than it may look, and it gathers and stores a lot of useful information. However, the Zabbix monitoring system as per the most common standard sits somewhere in our headquarters and monitors generic metrics of our IT infrastructure. So how could we potentially get this information from our vehicle to Zabbix?

No alt text provided for this image

 

Since all information is stored in ECU (Electronic Control Unit), there is also a way to read it. And it is achieved through OBD (On-Board Diagnostic) socket through the standardized protocol. Just like anything else, OBD has multiple versions or protocols of communications. Still, if we are talking about seamlessly modern cars, most likely we are talking about OBD-II, which included Electronic signaling and messaging format.

Using ELM327 to gather data

precisely OBD-II will help us to gather all information from the vehicle, to further transfer it to our Zabbix monitoring system. Initially, this may yet sound very unclear because we have some kind of socket to access our ECU, but how can we actually gather some meaningful data? For that, we will need ELM327

No alt text provided for this image

 

ELM327 is a programmed microcontroller produced by ELM Electronics for translating the OBD interface. Even today ELM327 command protocol is one of the most popular PC to OBD interface standards. Typically ELM abstracts the low-level protocol and presents a simple interface that can be called via UART, typically by a hand-held diagnostic tool or a computer program connected by USB, RS-232, Bluetooth, or WiFi. In our case, we don’t need and don’t have any dedicated diagnostic tool, so we will have to use something else to work with OBD-II and translate all incoming data. With the ELM-327, it is straightforward. You can purchase an ELM327 OBD2-Bluetooth adapter on Amazon for a couple of dollars, and it will be enough to provide the required functionality.

No alt text provided for this image

Data processing with Python-OBD and Raspberry Pi

As it usually happens, for all things that we need, we can find a Python library published under GPLv2. And as you already noticed from the screenshot, we are not limited to stored DTC values. In addition to that, we are able to read live data from our vehicle, such as speed, fuel pressure, coolant temperature, intake temperature, and much more.

No alt text provided for this image

 

The closer we get to the result, the simpler the task starts to look. At this point, we basically have everything that we need. We have the data, and we have the interface from which to read it. ELM327 allows us to transport this data to our device, and the python library will enable us to translate and process this information, therefore allowing sending clean data to our Zabbix. The only open question is what device should we use in our vehicle, on which we could run our Python script, and which would have GSM access to transfer gathered data to the Zabbix server. In my example choice was as simple as cheap – Raspberry Pi.

No alt text provided for this image

 

And then it’s a matter of choice when you have Raspberry set up on a vehicle, connected via Bluetooth or any other way to your ELM327, that is plugged into an OBD-II connector. With Python script running on Pi device to receive and process data from our ECM, we need to decide what piece of software from Zabbix we want on this device.

Zabbix proxy for data storage

Considering that the car could be driving through different areas where internet coverage could not be the best, but we also don’t want to lose any data simply because there was no connection, I think it is best to install Zabbix proxy on Raspberry Pi.

Zabbix proxy perfectly suits such a small setup and helps us with its main purpose. Proxy has a local database that stores all information that has to be sent to our Zabbix server. If because of some networking trouble this data can’t be passed to our server, it will be kept in the local database for a moment when a network connection is restored and data is sent. Luckily for us, Zabbix has Official Packages for Raspberry Pi OS, so we don’t need to tailor any magic around it.

The functionality of the Zabbix proxy allows us to choose between two modes ( Active and Passive ), which basically allows us to choose the direction of communication. It might not be the cheapest approach to purchase a static IP address for each unit. Therefore we will be using Zabbix Proxy (Active), which simply will connect to our Zabbix server and send all gathered information. Of course, there are security measures for validation to make sure that only designated devices will be able to send data to a server. If an even more secure approach is required, users may choose to use TLS encryption with PSK or Certificates.

No alt text provided for this image

Collecting the current latitude and longitude

Previously I mentioned, that with the new Geomap widget, it is possible to achieve a live view of the current location from all your fleet on a single dashboard. To do that, we obviously need live latitude and longitude readings, which ECU and stock Raspberry Pi are not able to provide. But this is the beauty of Raspberry Pi. With minimal investments, we can purchase a GPS unit and combine it with our Pi.

No alt text provided for this image

With a very simplified Python script, we can gather all required data, and move it to our Zabbix proxy that is installed on localhost, which then will parse this information to our Zabbix Server that will allow us to see it in the dashboard. As this is not a very native and straightforward approach to monitoring, we won’t be able to use native item types to collect this data. This means that all the collection must be done within the script, and then we need to pass this data using the Zabbix-sender utility. The purpose of this utility is very simple, without any complications, take data that is provided and send it to a specified Hostname.

No alt text provided for this image

 

Since Zabbix has a very powerful preprocessing engine, we don’t have to make our script over-complicated with data transformation to meet guidelines for data visualization within Zabbix. We can send raw data, just like it is, and then use any suitable preprocessing step in the Zabbix frontend to extract the value we need to visualize.

No alt text provided for this image

The many uses of the collected data

When the data arrives in the Latest data in our Zabbix frontend, consider the most complicated part of this task is done. And just like before the idea of automotive monitoring with Zabbix, the only limitation is your imagination. You can simply collect this data without any actions. Monitor it on your own, from time to time, just to see if you can do anything meaningful with it.

You are also able to utilize a wide list of trigger functions within Zabbix to define that it is a problem when some particular value is received. For example, when some DTC appeared on a device, or let’s say, the average speed of the vehicle exceeds a threshold. Maybe you want to set some borders for coordinates, and if a particular vehicle gets outside of a specified radius, it could raise a problem in your monitoring system.

It is up to you how to react to these triggers. It could be just a flashing light on your Problems view within the Zabbix frontend. It could also automatically create incidents for your maintenance team with a message that a particular vehicle has worn out brake pads that has to be replaced. But maybe if these brake pads are not replaced for a full week since the first time it was noticed, you want to receive a personalized message on your mobile phone so that you can escalate this issue further.

No secret that there are flaws and downsides. As I mentioned right in beginning, there are software and devices that are developed and adopted exactly for this purpose, however, my approach may not be 100% reliable. Data transfer from ECU is not as live as reading CPU utilization from your computer. All of this is just a reminder that monitoring is not limited to network devices and servers. And Zabbix, which is growing every year, provides more and more features to its users while remaining absolutely free and open-source, is here to support all your ideas and help them come to life.

The post ZABBIX – Open-Source Monitoring Software for Automotive Monitoring appeared first on Zabbix Blog.

How Belcorp decreased cost and improved reliability in its big data processing framework using Amazon EMR managed scaling

Post Syndicated from Diego Benavides original https://aws.amazon.com/blogs/big-data/how-belcorp-decreased-cost-and-improved-reliability-in-its-big-data-processing-framework-using-amazon-emr-managed-scaling/

This is a guest post by Diego Benavides and Luis Bendezú, Senior Data Architects, Data Architecture Direction at Belcorp.

Belcorp is one of the main consumer packaged goods (CPG) companies providing cosmetics products in the region for more than 50 years, allocated to around 13 countries in North, Central, and South America (AMER). Born in Peru and with its own product factory in Colombia, Belcorp always stayed ahead of the curve and adapted its business model according to customer needs and strengthened its strategy with technological trends, providing each time a better customer experience. Focused on this, Belcorp began to implement its own data strategy encouraging the use of data for decision-making. Based on this strategy, the Belcorp data architecture team designed and implemented a data ecosystem allowing business and analytics teams to consume functional data that they use to generate hypotheses and insights that are materialized in better marketing strategies or novel products. This post aims to detail a series of continuous improvements carried out during 2021 in order to reduce the number of platform incidents reported at the end of 2020, optimize SLAs required by the business, and be more cost-efficient when using Amazon EMR, resulting in up to 30% savings for the company.

To stay ahead of the curve, stronger companies have built a data strategy that allows them to improve main business strategies, or even create new ones, using data as a main driver. As one of the main consumer packaged goods (CPG) companies in the region, Belcorp is not an exception—in recent years we have been working to implement data-driven decision-making.

We know that all good data strategy is aligned to business objectives and based on main business use cases. Currently, all our team efforts are focused on the final consumers, and almost all business initiatives are related to hyper-personalization, pricing, and customer engagement.

To support these initiatives, the data architecture department provides data services like data integration, only one source of truth, data governance and data quality frameworks, data availability, data accessibility, and optimized time to market, according to business requirements like other big companies. To provide minimal capabilities to support all these services, we needed a scalable, flexible, and cost-efficient data ecosystem. Belcorp started this adventure a couple of years ago using AWS services like Amazon Elastic Compute Cloud (Amazon EC2), AWS Lambda, AWS Fargate, Amazon EMR, Amazon DynamoDB, and Amazon Redshift, which currently feed our main analytical solutions with data.

As we were growing, we had to continually improve our architecture design and processing framework in regards to data volume and more complex data solution requirements. We also had to adopt quality and monitoring frameworks in order to guarantee data integrity, data quality, and service level agreements (SLAs). As you can expect, it’s not an easy task, and requires its own strategy. At the beginning of 2021 and due to critical incidents we were finding, operational stability was affected, directly impacting business outcomes. Billing was also impacted, due to more new complex workloads being included, which caused an unexpected increase in platform costs. In response, we decided to focus on three challenges:

  • Operational stability
  • Cost-efficiency
  • Service level agreements

This post details some action points we carried out during 2021 over Belcorp’s data processing framework based on Amazon EMR. We also discuss how these actions helped us face the challenges previously mentioned, and also provide economic savings to Belcorp, which was the data architecture team’s main contribution to the company.

Overview of solution

Belcorp’s data ecosystem is composed by seven key capability pillars (as shown in the following diagram) that define our architectural design and give us more or less technological flexible options. Our data platform can be classified as a part of the second generation of data platforms, as mentioned by Zhamak Dehghani in How to Move Beyond a Monolithic Data Lake to a Distributed Data Mesh. In fact, it has all the limitations and restrictions of a Lakehouse approach as mentioned in the paper Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics .

Belcorp’s data platform supports two main use cases. On one side, it provides data to be consumed using visualization tools, encouraging self-service. On the other side, it provides functional data to end-users, like data scientists or data analysts, through distributed data warehouses and object storage more suited to advanced analytical practices.

The following reference design explains the main two layers in charge of providing functional data for these use cases. The data processing layer is composed of two sub-layers. The first is Belcorp’s Data Lake Integrator, which is a built-in, in-house Python solution with a set of API REST services in charge of organizing all the data workloads and data stages inside the analytics repositories. It also works as a point of control to distribute resources to be allocated for each Amazon EMR Spark job. The processing sub-layer is mainly composed of the EMR cluster, which is in charge of orchestrating, tracking, and maintaining all the Spark jobs developed using a Scala framework.

For the persistent repository layer, we use Amazon Simple Storage Service (Amazon S3) object storage as a data repository for analytics workloads, where we have designed a set of data stages that have operational and functional purposes based on the reference architecture design. Discussing the repository design in more depth is out of scope for this post, but we must note that it covers all the common challenges related to data availability, data accessibility, data consistency, and data quality. In addition, it achieves all Belcorp’s needs required by its business model, despite all limitations and restrictions we inherit by the design previously mentioned.

We can now move our attention to the main purpose of this post.

As we mentioned, we experienced critical incidents (some of which existed before) and unexpected cost increases at the beginning of 2021, which motivated us to take action. The following table lists some of the main issues that attracted our attention.

Reported Incidents Impact
Delay in Spark jobs on Amazon EMR Core workloads take a long time
Delay in Amazon EMR nodes auto scaling Workloads take a long time
Increase in Amazon EMR computational usage per node Unexpected cost increase
Lost resource containers Workloads process a huge data crash
Overestimated memory and CPUs Unexpected cost increase

To face these issues, we decided to change strategies and started to analyze each issue in order to identify the cause. We defined two action lines based on three challenges that the leaders wanted us to work on. The following figure summarizes these lines and challenges.

The data lake architecture action line refers to all the architectural gaps and deprecated features that we determined as part of the main problems that were generating the incidents. The Spark development best practices action line is related to the developed Spark data solution that had been causing instability due to bad practices during the development lifecycle. Focusing on these action lines, our leaders defined three challenges in order to decrease the number of incidents and guarantee the quality of the service we provide: operational stability, cost-efficiency, and SLAs.

Based on these challenges, we defined three KPIs to measure the success of the project. Jira incidents allow us to validate that our changes are having a positive impact; billing per week shows the leaders that part of the changes we applied will gradually optimize cost; and runtime provides the business users with a better time to market.

Next, we defined the next steps and how to measure progress. Based on our monitoring framework, we determined that almost all incidents that arose were related to the data processing and persistent repository layers. Then we had to decide how to solve them. We could make reactive fixes in order to achieve operational stability and not have an impact on business, or we could change our usual way of working, analyze each issue, and provide a final solution to optimize our framework. As you can guess, we decided to change our way of working.

We performed a preliminary analysis to determine the main impacts and challenges. We then proposed the following actions and improvements based on our action lines:

  • Data lake architecture – We redesigned the EMR cluster; we’re now using core and task nodes
  • Spark development best practices – We optimized Spark parameters (RAM memory, cores, CPUs, and executor number)

In the next section, we explain in detail the actions and improvements proposed in order to achieve our goals.

Actions and improvements

As we mentioned in the previous section, the analysis made by the architecture team resulted in a list of actions and improvements that would help us face three challenges: operational stability, a cost-efficient data ecosystem, and SLAs.

Before going further, it’s a good time to provide more details about the Belcorp data processing framework. We built it based on Apache Spark using the Scala programming language. Our data processing framework is a set of scalable, parameterizable, and reusable Scala artifacts that provide development teams with a powerful tool to implement complex data pipelines, achieving the most complex business requirements using Apache Spark technology. Through the Belcorp DevOps framework, we deploy each artifact to several non-production environments. Then we promote into production, where the EMR cluster launches all the routines using the Scala artifacts that reference each conceptual area inside the analytical platform. This part of the cycle provides the teams with some degree of flexibility and agility. However, we forgot, for a moment, the quality of the software we were developing using Apache Spark technology.

In this section, we dive into the actions and improvements we applied in order to optimize the Belcorp data processing framework and improve the architecture.

Redesigning the EMR cluster

The current design and implementation of the Belcorp data lake is not the first version. We’re currently in version 2.0, and from the beginning of the first implementation until now, we’ve tried different EMR cluster designs to implement the data processing layer. Initially, we used a fixed cluster with four nodes (as shown in the following figure), but when the auto scaling capability was launched and Belcorp’s data workloads increased, we decided to move it there to optimize resource usage and costs. However, an auto scaled EMR cluster has different options too. You can choose between core and task nodes with a minimal and maximum number of each. In addition, you can select On-Demand or Spot Instances. You can also implement an optimized allocation strategy using EMR instance fleets to reduce the probability of Spot Instance loss. For more information about Amazon EMR resources allocation strategies, see Spark enhancements for elasticity and resiliency on Amazon EMR and Optimizing Amazon EMR for resilience and cost with capacity-optimized Spot Instances.

We tested all these capabilities, but we found some problems.

First, although AWS offers many capabilities and functionalities around Amazon EMR, if you don’t have some degree of knowledge about the technology that you want to use, you may encounter many issues as the use cases arise. As we mentioned, we decided to use the Apache Spark data processing engine through Amazon EMR as a part of Belcorp data ecosystem, but we faced many issues. Whenever an incident appeared, it motivated the data architect team in charge to fix it, as a part of the operational and support tasks. Almost all these reactive fixes were related to changing Amazon EMR configuration to try different alternatives in order to efficiently solve these incidents.

We figured out that almost all incidents were related to resource allocation, so we tested many configuration options such as instance types, increasing the number of nodes, customized rules for auto scaling, and fleet strategies. This last option was used to reduce node loss. At the end of 2020, we validated that an EMR cluster with automatic scaling enabled with a minimum capacity of three On-Demand core nodes 24/7 and the ability to scale up to 25 On-Demand core nodes provided us with a stable data processing platform. At the beginning of 2021, more complex Spark jobs were deployed as a part of the data processing routines inside the EMR cluster, causing operational instability again. In addition, the billing was increasing unexpectedly, which alerted leaders whose team needed to redesign the EMR cluster in order to keep healthy operational stability and optimize the costs.

We soon realized that it was possible to reduce up to 40% of the current billing using Spot Instances, instead of keeping all core nodes in On-Demand consumption. Another infrastructure optimization that we wanted to apply was to replace a number of core nodes with task nodes, because almost all Belcorp data workloads are memory-intensive and use Amazon S3 to read the source data and write the result dataset. The question here was how to do that without losing the benefits of the current design. To answer this question, we had the guidance of the AWS Account Team and our AWS Analytics and Big Data Specialist SA, in order to clarify questions about the following:

  • Apache Spark implementation in Amazon EMR
  • Core and task node best practices for production environments
  • Spot Instance behavior in Amazon EMR

We definitely recommend addressing these three main points before applying any changes because, according to our previous experience, making modifications in the dark can lead to costly and underperforming Amazon EMR implementation. With that in mind, we redesigned the EMR cluster to utilize EMR managed scaling, which automatically resizes your cluster for best performance at the lowest possible cost. We defined a maximum of 28 capacity units with three On-Demand core nodes always on (24/7) in order to support data workloads during the day. We then set an auto scaling limit of six On-Demand cores in order to provide minimal HDFS capabilities to support the remaining 22 task nodes composed of Spot Instances. This final configuration is based on advice from AWS experts that we have at least one core node to support six task nodes, keeping a 1:6 ratio. The following table summarizes our cluster design.

Cluster Scaling Policy Amazon EMR Managed Scaling Enabled
Minimum node units (MinimumCapacityUnits) 3
Maximum node units (a) 28
On-demand limit (MaximumOnDemandCapacityUnits) 6
Maximum core nodes (MaximumCoreCapacityUnits) 6
Instance type m4.10xlarge
Number of primary nodes 1
Primary node instance type m4.4xlarge

The following figure illustrates our updated and current cluster design.

Tuning Spark parameters

As any good book about Apache Spark can tell you, Spark parameter tuning is the main topic you need to look into before deploying a Spark application in production.

Adjusting Spark parameters is the task of setting up the resources (CPUs, memory, and the number of executors) to each Spark application. In this post, we don’t focus on driver instance resources; we focus on the executors because that’s the main issue we found inside Belcorp’s implementation.

After we applied improvements around join operation and cache strategies in Spark application development, we realized that some of those applications were assigned with overestimated resources in the EMR cluster. That means Spark applications assigned resources, but only 30% of the resources were used. The following Ganglia report illustrates the overestimation of resource allocation for one Spark application job, which we captured during one of our tests.

A big consequence of this behavior was the massive deployment of EMR nodes that weren’t being properly utilized. That means that numerous nodes were provisioned because of the auto scaling feature required by a Spark application submit, but much of the resources of these nodes were kept free. We show a basic example of this later in this section.

With this evidence, we began to suspect that we needed to adjust the Spark parameters of some of our Spark applications.

As we mentioned in previous sections, as part of the Belcorp data ecosystem, we built a Data Pipelines Integrator, which has the main responsibility of maintaining centralized control of the runs of each Spark application. To do that, it uses a JSON file containing the Spark parameter configuration and performs each spark-submit using Livy service, as shown in the following example code:

'/usr/lib/spark/bin/spark-submit' '--class' 'LoadToFunctional' '--conf' 'spark.executor.instances=62' '--conf' 'spark.executor.memory=17g' '--conf' 'spark.yarn.maxAppAttempts=2' '--conf' 'spark.submit.deployMode=cluster' '--conf' 'spark.master=yarn' '--conf' 'spark.executor.cores=5' 's3://<bucket-name>/FunctionalLayer.jar' '--system' 'CM' '--country' 'PE' '--current_step' 'functional' '--attempts' '1' '--ingest_attributes' '{"FileFormat": "zip", "environment": "PRD", "request_origin": "datalake_integrator", "next_step": "load-redshift"}' '--fileFormat' 'zip' '--next_step' 'load-redshift'

This JSON file contains the Spark parameter configuration of each Spark application related to an internal system and country we submit to the EMR cluster. In the following example, CM is the name of the system and PE is the country code that the data comes from:

"systems" : {
  "CM" : {
    "PE" : { 
      "params" : {"executorCores": 15, "executorMemory": "45g", "numExecutors": 50 },
      "conf" : { "spark.sql.shuffle.partitions" :120 }
    }
}

The problem with this approach is that as we add more applications, the management of these configuration files becomes more complex. In addition, we had a lot of Spark applications set up with a default configuration that was defined a long time ago when workloads were less expensive. So, it was expected that some things would change. One example of a Spark application with uncalibrated parameters is shown in the following figure (we use four executor instances only for the example). In this example, we realized we were allocating executors with a lot of resources without following any of the Spark best practices. This was causing the provisioning of fat executors (using Spark slang) allocating each of those in at least one node. That means that if we define a Spark application to be submitted using 10 executors, we require at least 10 nodes of the cluster and use 10 nodes for only one run, which was very expensive for us.

When you deal with Spark parameter tuning challenges, it’s always a good idea to follow expert advice. Perhaps one of the most important pieces of advice is related to the number of executor cores you should use in one Spark application. Experts suggest that an executor should have up to four or five cores. We were familiar with this restriction because we formerly developed Spark applications in the Hadoop ecosystem because of Hadoop File Systems I/O restrictions. That is, if we have more cores configured for one executor, we perform more I/O operations in a single HDFS data node, and it’s well known that HDFS degrades due to high concurrency. This constraint isn’t a problem if we use Amazon S3 as storage, but the suggestion remains due to the overload of the JVM. Remember, while you have more operational tasks, like I/O operations, the JVM of each executor has more work to do, so the JVM is degraded.

With these facts and previous findings, we realized that for some of our Spark applications, we were using only 30% of the assigned resources. We needed to recalibrate the Spark job parameters in order to allocate only the best-suited resources and significantly reduce the overuse of EMR nodes. The following figure provides an example of the benefits of this improvement, where we can observe a 50% of node reduction based on our earlier configuration.

We used the following optimized parameters to optimize the Spark application related to the CM system:

"systems" : {
  "CM" : {
    "PE" : { 
      "params" : {"executorCores": 5, "executorMemory": "17g", "numExecutors": 62 },
      "conf" : { "spark.sql.shuffle.partitions" :120 }
    }
}

Results

In this post, we wanted to share the success story of our project to improve the Belcorp data ecosystem, based on two lines of actions and three challenges defined by leaders using AWS data technologies and in-house platforms.

We were clear about our objectives from the beginning based on the defined KPIs, so we’ve been able to validate that the number of JIRA incidents reported at the end of May 2021 had a notable reduction. The following figures shows a reduction of up to 75% in respect to previous months, highlighting March as a critical peak.

Based on this incident reduction, we figured out that almost all Spark job routines running in the EMR cluster benefitted from a runtime optimization, including the two most complex Spark jobs, with a reduction up to 60%, as shown in the following figure.

Perhaps the most important contribution of the improvements made by the team is directly related to the billing per week. For example, Amazon EMR redesigning, the join operation improvements, cache best practices applied, and Spark parameter tuning—all of these produced a notable reduction in the use of cluster resources. As we know, Amazon EMR calculates billing based on the time that the cluster nodes have been on, regardless of whether they do any work. So, when we optimized EMR cluster usage, we optimized the costs we were generating as well. As shown in the following figure, only in 2 months, between March and May, we achieved a billing reduction of up to 40%. We estimate that we will save up to 26% of the annual billing that would have been generated without the improvements.

Conclusion and next steps

The data architecture team is in charge of the Belcorp data ecosystem’s continuous improvements, and we’re always being challenged to achieve a best-in-class architecture, craft better architectural solution designs, optimize cost, and create the most automated, flexible, and scalable frameworks.

At the same time, we’re thinking about the future of this data ecosystem—how we can adapt to new business needs, generate new business models, and address current architectural gaps. We’re working now on the next generation of the Belcorp data platform, based on novel approaches like data products, data mesh, and lake houses. We believe these new approaches and concepts are going to help us to cover our current architectural gaps in the second generation of our data platform design. Additionally, it’s going to help us better organize the business and development teams in order to obtain greater agility during the development cycle. We’re thinking of data solutions as a data product, and providing teams with a set of technological components and automated frameworks they can use as building blocks.

Acknowledgments

We would like to thank our leaders, especially Jose Israel Rico, Corporate Data Architecture Director, and Venkat Gopalan, Chief Technology, Data and Digital Officer, who inspire us to be customer centric, insist on the highest standards, and support every technical decision based on a stronger knowledge of the state of the art.


About the Authors

Diego Benavides is the Senior Data Architect of Belcorp in charge of the design, implementation, and the continuous improvement of the Global and Corporate Data Ecosystem Architecture. He has experience working with big data and advanced analytics technologies across many industry areas like telecommunication, banking, and retail.

Luis Bendezú works as a Senior Data Engineer at Belcorp. He’s in charge of continuous improvements and implementing new data lake features using a number of AWS services. He also has experience as a software engineer, designing APIs, integrating many platforms, decoupling applications, and automating manual jobs.

Mar Ortiz is a bioengineer who works as a Solutions Architect Associate at AWS. She has experience working with cloud compute and diverse technologies like media, databases, compute, and distributed architecture design.

Raúl Hugo is an AWS Sr. Solutions Architect with more than 12 years of experience in LATAM financial companies and global telco companies as a SysAdmin, DevOps engineer, and cloud specialist.

Ibotta builds a self-service data lake with AWS Glue

Post Syndicated from Erik Franco original https://aws.amazon.com/blogs/big-data/ibotta-builds-a-self-service-data-lake-with-aws-glue/

This is a guest post co-written by Erik Franco at Ibotta.

Ibotta is a free cash back rewards and payments app that gives consumers real cash for everyday purchases when they shop and pay through the app. Ibotta provides thousands of ways for consumers to earn cash on their purchases by partnering with more than 1,500 brands and retailers.

At Ibotta, we process terabytes of data every day. Our vision is to allow for these datasets to be easily used by data scientists, decision-makers, machine learning engineers, and business intelligence analysts to provide business insights and continually improve the consumer and saver experience. This strategy of data democratization has proven to be a key pillar in the explosive growth Ibotta has experienced in recent years.

This growth has also led us to rethink and rebuild our internal technology stacks. For example, as our datasets began to double in size every year combined with complex, nested JSON data structures, it became apparent that our data warehouse was no longer meeting the needs of our analytics teams. To solve this, Ibotta adopted a data lake solution. The data lake proved to be a huge success because it was a scalable, cost-effective solution that continued to fulfill the mission of data democratization.

The rapid growth that was the impetus for the transition to a data lake has now also forced upstream engineers to transition away from the monolith architecture to a microservice architecture. We now use event-driven microservices to build fault-tolerant and scalable systems that can react to events as they occur. For example, we have a microservice in charge of payments. Whenever a payment occurs, the service emits a PaymentCompleted event. Other services may listen to these PaymentCompleted events to trigger other actions, such as sending a thank you email.

In this post, we share how Ibotta built a self-service data lake using AWS Glue. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.

Challenge: Fitting flexible, semi-structured schemas into relational schemas

The move to an event-driven architecture, while highly valuable, presented several challenges. Our analytics teams use these events for use cases where low-latency access to real-time data is expected, such as fraud detection. These real-time systems have fostered a new area of growth for Ibotta and complement well with our existing batch-based data lake architecture. However, this change presented two challenges:

  • Our events are semi-structured and deeply nested JSON objects that don’t translate well to relational schemas. Events are also flexible in nature. This flexibility allows our upstream engineering teams to make changes as needed and thereby allows Ibotta to move quickly in order to capitalize on market opportunities. Unfortunately, this flexibility makes it very difficult to keep schemas up to date.
  • Adding to these challenges, in the last 3 years, our analytics and platform engineering teams have doubled in size. Our data processing team, however, has stayed the same size largely due to difficulty in hiring qualified data engineers who possess specialized skills in developing scalable pipelines and industry demand. This meant that our data processing team couldn’t keep up with the requests from our analytics teams to onboard new data sources.

Solution: A self-service data lake

To solve these issues, we decided that it wasn’t enough for the data lake to provide self-service data consumption features. We also needed self-service data pipelines. These would provide both the platform engineering and analytics teams with a path to make their data available within the data lake and with minimal to no data engineering intervention necessary. The following diagram illustrates our self-service data ingestion pipeline.

The pipeline includes the following components:

  1. Ibotta data stakeholders – Our internal data stakeholders wanted the capability to automatically onboard datasets. This user base includes platform engineers, data scientists, and business analysts.
  2. Configuration file – Our data stakeholders update a YAML file with specific details on what dataset they need to onboard. Sources for these datasets include our enterprise microservices.
  3. Ibotta enterprise microservices – Microservices make up the bulk of our Ibotta platform. Many of these microservices utilize events to asynchronously communicate important information. These events are also valuable for deriving analytics insights.
  4. Amazon Kinesis – After the configuration file is updated, data is immediately streamed to Amazon Kinesis. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data so you can get timely insights and react quickly to new information. Streaming the data through Kinesis Data Streams and Kinesis Data Firehose gives us the flexibility to analyze the data in real time while also allowing us to store the data in Amazon Simple Storage Service (Amazon S3).
  5. Ibotta self-service data pipeline – This is the starting point of our data processing. We use Apache Airflow to orchestrate our pipelines once every hour.
  6. Amazon S3 raw data – Our data lands in Amazon S3 without any transformation. The complex nature of the JSON is retained for future processing or validation.
  7. AWS Glue – Our goal now is to take the complex nested JSON and create a simpler structure. AWS Glue provides a set of built-in transforms that we use to process this data. One of the transforms is Relationalize—an AWS Glue transform that takes semi-structured data and transforms it into a format that can be more easily analyzed by engines like Presto. This feature means that our analytics teams can continue to use the analytics engines they’re comfortable with and thereby lessen the impact of transitioning from relational data sources to semi-structured event data sources. The Relationalize function can flatten nested structures and create multiple dynamic frames. We use 80 lines of code to convert any JSON-based microservice message to a consumable table. We have provided this code base here as a reference and not for reuse.
    // Convert to a DynamicFrame and relationalize
       // Convert it back to DataFrame
       val dynamicFrame: DynamicFrame = DynamicFrame(df, glueContext)
       val dynamicFrameCollection: Seq[DynamicFrame] = dynamicFrame.relationalize(rootTableName = glueSourceTable,
         stagingPath = glueTempStorage,
         options = JsonOptions.empty)
       val relationalizedDF: Dataset[Row] = removeColumnDotNotationRelationalize(dynamicFrameCollection(0).toDF())
       // Get rid of dot-notation and repartition it
       val repartitionedDF: Dataset[Row] = relationalizedDF.repartition(finalRepartitionValue.toInt)
       // Write it out
       repartitionedDF
         .write
         .mode("overwrite")
         .option("compression", "snappy")
         .parquet(glueRelationalizeOutputS3Path)

  8. Amazon S3 curated – We then store the relationalized structures as Parquet format in Amazon S3.
  9. AWS Glue crawler AWS Glue crawlers allow us to automatically discover schema and catalogs in the AWS Glue Data Catalog. This feature is a core component of our self-service data pipelines because it removes the requirement of having a data engineer manually create or update the schemas. Previously, if a change needed to occur, it flowed through a communication path that included platform engineers, data engineers, and analytics. AWS Glue crawlers effectively remove the data engineers from this communication path. This means new datasets or changes to datasets are made available quickly within the data lake. It also frees up our data engineers to continue working on improvements to our self-service data pipelines and other data paved roadmap features.
  10. AWS Glue Data Catalog – A common problem in growing data lakes is that the datasets can become harder and harder to work with. A common reason for this is a lack of discoverability of data within the data lake as well as a lack of clear understanding of what the datasets are conveying. The AWS Glue Catalog is a feature that works in conjunction with AWS Glue crawlers to provide data lake users with searchable metadata for different data lake datasets. As AWS Glue crawlers discover new datasets or updates, they’re recorded into the Data Catalog. You can then add descriptions at the table or fields levels for these datasets. This cuts down on the level of tribal knowledge that exists between various data lake consumers and makes it easy for these users to self-serve from the data lake.
  11. End-user data consumption – The end-users are the same as our internal stakeholders called out in Step 1.

Benefits

The AWS Glue capabilities we described make it a core component of building our self-service data pipelines. When we initially adopted AWS Glue, we saw a three-fold decrease in our OPEX costs as compared to our previous data pipelines. This was further enhanced when AWS Glue moved to per-second billing. To date, AWS Glue has allowed us to realize a five-fold decrease in OPEX costs. Also, AWS Glue requires little to no manual intervention to ingest and process our over 200 complex JSON objects. This allows Ibotta to utilize AWS Glue each day as a key component in providing actionable data to the organization’s growing analytics and platform engineering teams.

We took away the following learnings in building self-service data platforms:

Conclusion and next steps

With the self-service data lake we have established, our business teams are realizing the benefits of speed and agility. As next steps, we’re going to improve our self-service pipeline with the following features:

  • AWS Glue streaming – Use AWS Glue streaming for real-time relationalization. With AWS Glue streaming, we can simplify our self-service pipelines by potentially getting rid of our orchestration layer while also getting data into the data lake sooner.
  • Support for ACID transactions – Implement data formats in the data lake that allow for ACID transactions. A benefit of this ACID layer is the ability to merge streaming data into data lake datasets.
  • Simplify data transport layers – Unify the data transport layers between the upstream platform engineering domains and the data domain. From the time we first implemented an event-driven architecture at Ibotta to today, AWS has offered new services such as Amazon EventBridge and Amazon Managed Streaming for Apache Kafka (Amazon MSK) that have the potential to simplify certain facets of our self-service and data pipelines.

We hope that this blog post will inspire your organization to build a self-service data lake using serverless technologies to accelerate your business goals.


About the Authors

Erik Franco is a Data Architect at Ibotta and is leading Ibotta’s implementation of its next-generation data platform. Erik enjoys fishing and is an avid hiker. You can often find him hiking one of the many trails in Colorado with his lovely wife Marlene and wonderful dog Sammy.

Shiv Narayanan is Global Business Development Manager for Data Lakes and Analytics solutions at AWS. He works with AWS customers across the globe to strategize, build, develop and deploy modern data platforms. Shiv loves music, travel, food and trying out new tech.

Matt Williams is a Senior Technical Account Manager for AWS Enterprise Support. He is passionate about guiding customers on their cloud journey and building innovative solutions for complex problems. In his spare time, Matt enjoys experimenting with technology, all things outdoors, and visiting new places.

How Goldman Sachs built persona tagging using Apache Flink on Amazon EMR

Post Syndicated from Balasubramanian Sakthivel original https://aws.amazon.com/blogs/big-data/how-goldman-sachs-built-persona-tagging-using-apache-flink-on-amazon-emr/

The Global Investment Research (GIR) division at Goldman Sachs is responsible for providing research and insights to the firm’s clients in the equity, fixed income, currency, and commodities markets. One of the long-standing goals of the GIR team is to deliver a personalized experience and relevant research content to their research users. Previously, in order to customize the user experience for their various types of clients, GIR offered a few distinct editions of their research site that were provided to users based on broad criteria. However, GIR did not have any way to create a personally curated content flow at the individual user level. To provide this functionality, GIR wanted to implement a system to actively filter the content that is recommended to their users on a per-user basis, keyed on characteristics such as the user’s job title or working region. Having this kind of system in place would both improve the user experience and simplify the workflows of GIR’s research users, by reducing the amount of time and effort required to find the research content that they need.

The first step towards achieving this is to directly classify GIR’s research users based on their profiles and readership. To that end, GIR created a system to tag users with personas. Each persona represents a type or classification that individual users can be tagged with, based on certain criteria. For example, GIR has a series of personas for classifying a user’s job title, and a user tagged with the “Chief Investment Officer” persona will have different research content highlighted and have a different site experience compared to one that is tagged with the “Corporate Treasurer” persona. This persona-tagging system can both efficiently carry out the data operations required for tagging users, as well as have new personas created as needed to fit use cases as they emerge.

In this post, we look at how GIR implemented this system using Amazon EMR.

Challenge

Given the number of contacts (i.e., millions) and the growing number of publications maintained in GIR’s research data store, creating a system for classifying users and recommending content is a scalability challenge. A newly created persona could potentially apply to almost every contact, in which case a tagging operation would need to be performed on several million data entries. In general, the number of contacts, the complexity of the data stored per contact, and the amount of criteria for personalization can only increase. To future-proof their workflow, GIR needed to ensure that their solution could handle the processing of large amounts of data as an expected and frequent case.

GIR’s business goal is to support two kinds of workflows for classification criteria: ad hoc and ongoing. An ad hoc criteria causes users that currently fit the defining criteria condition to immediately get tagged with the required persona, and is meant to facilitate the one-time tagging of specific contacts. On the other hand, an ongoing criteria is a continuous process that automatically tags users with a persona if a change to their attributes causes them to fit the criteria condition. The following diagram illustrates the desired personalization flow:

In the rest of this post, we focus on the design and implementation of GIR’s ad hoc workflow.

Apache Flink on Amazon EMR

To meet GIR’s scalability demands, they determined that Amazon EMR was the best fit for their use case, being a managed big data platform meant for processing large amounts of data using open source technologies such as Apache Flink. Although GIR evaluated a few other options that addressed their scalability concerns (such as AWS Glue), GIR chose Amazon EMR for its ease of integration into their existing systems and possibility to be adapted for both batch and streaming workflows.

Apache Flink is an open source big data distributed stream and batch processing engine that efficiently processes data from continuous events. Flink offers exactly-once guarantees, high throughput and low latency, and is suited for handling massive data streams. Also, Flink provides many easy-to-use APIs and mitigates the need for the programmer to worry about failures. However, building and maintaining a pipeline based on Flink comes with operational overhead and requires considerable expertise, in addition to provisioning physical resources.

Amazon EMR empowers users to create, operate, and scale big data environments such as Apache Flink quickly and cost-effectively. We can optimize costs by using Amazon EMR managed scaling to automatically increase or decrease the cluster nodes based on workload. In GIR’s use case, their users need to be able to trigger persona-tagging operations at any time, and require a predictable completion time for their jobs. For this, GIR decided to launch a long-running cluster, which allows multiple Flink jobs to be submitted simultaneously to the same cluster.

Ad hoc persona-tagging infrastructure and workflow

The following diagram illustrates the architecture of GIR’s ad hoc persona-tagging workflow on the AWS Cloud.

This is a broad overview, and the specifics of networking and security between components are out of scope for this post.

At a high level, we can discuss GIR’s workflow in four parts:

  1. Upload the Flink job artifacts to the EMR cluster.
  2. Trigger the Flink job.
  3. Within the Flink job, transform and then store user data.
  4. Continuous monitoring.

You can interact with Flink on Amazon EMR via the Amazon EMR console or the AWS Command Line Interface (AWS CLI). After launching the cluster, GIR used the Flink API to interact with and submit work to the Flink application. The Flink API provided a bit more functionality and was much easier to invoke from an AWS Lambda application.

The end goal of the setup is to have a pipeline where GIR’s internal users can freely make requests to update contact data (which in this use case is tagging or untagging contacts with various personas), and then have the updated contact data uploaded back to the GIR contact store.

Upload the Flink job artifacts to Amazon EMR

GIR has a GitLab project on-premises for managing the contents of their Flink job. To trigger the first part of their workflow and deploy a new version of the Flink job onto the cluster, a GitLab pipeline is run that first creates a .zip file containing the Flink job JAR file, properties, and config files.

The preceding diagram depicts the sequence of events that occurs in the job upload:

  1. The GitLab pipeline is manually triggered when a new Flink job should be uploaded. This transfers the .zip file containing the Flink job to an Amazon Simple Storage Service (Amazon S3) bucket on the GIR AWS account, labeled as “S3 Deployment artifacts”.
  2. A Lambda function (“Upload Lambda”) is triggered in response to the create event from Amazon S3.
  3. The function first uploads the Flink job JAR to the Amazon EMR Flink cluster, and retrieves the application ID for the Flink session.
  4. Finally, the function uploads the application properties file to a specific S3 bucket (“S3 Flink Job Properties”).

Trigger the Flink job

The second part of the workflow handles the submission of the actual Flink job to the cluster when job requests are generated. GIR has a user-facing web app called Personalization Workbench that provides the UI for carrying out persona-tagging operations. Admins and internal Goldman Sachs users can construct requests to tag or untag contacts with personas via this web app. When a request is submitted, a data file is generated that contains the details of the request.

The steps of this workflow are as follows:

  1. Personalization Workstation submits the details of the job request to the Flink Data S3 bucket, labeled as “S3 Flink data”.
  2. A Lambda function (“Run Lambda”) is triggered in response to the create event from Amazon S3.
  3. The function first reads the job properties file uploaded in the previous step to get the Flink job ID.
  4. Finally, the function makes an API call to run the required Flink job.

Process data

Contact data is processed according to the persona-tagging requests, and the transformed data is then uploaded back to the GIR contact store.

The steps of this workflow are as follows:

  1. The Flink job first reads the application properties file that was uploaded as part of the first step.
  2. Next, it reads the data file from the second workflow that contains the contact and persona data to be updated. The job then carries out the processing for the tagging or untagging operation.
  3. The results are uploaded back to the GIR contact store.
  4. Finally, both successful and failed requests are written back to Amazon S3.

Continuous monitoring

The final part of the overall workflow involves continuous monitoring of the EMR cluster in order to ensure that GIR’s tagging workflow is stable and that the cluster is in a healthy state. To ensure that the highest level of security is maintained with their client data, GIR wanted to avoid unconstrained SSH access to their AWS resources. Being constrained from accessing the EMR cluster’s primary node directly via SSH meant that GIR initially had no visibility into the EMR primary node logs or the Flink web interface.

By default, Amazon EMR archives the log files stored on the primary node to Amazon S3 at 5-minute intervals. Because this pipeline serves as a central platform for processing many ad hoc persona-tagging requests at a time, it was crucial for GIR to build a proper continuous monitoring system that would allow them to promptly diagnose any issues with the cluster.

To accomplish this, GIR implemented two monitoring solutions:

  • GIR installed an Amazon CloudWatch agent onto every node of their EMR cluster via bootstrap actions. The CloudWatch agent collects and publishes Flink metrics to CloudWatch under a custom metric namespace, where they can be viewed on the CloudWatch console. GIR configured the CloudWatch agent configuration file to capture relevant metrics, such as CPU utilization and total running EMR instances. The result is an EMR cluster where metrics are emitted to CloudWatch at a much faster rate than waiting for periodic S3 log flushes.
  • They also enabled the Flink UI in read-only mode by fronting the cluster’s primary node with a network load balancer and establishing connectivity from the Goldman Sachs on-premises network. This change allowed GIR to gain direct visibility into the state of their running EMR cluster and in-progress jobs.

Observations, challenges faced, and lessons learned

The personalization effort marked the first-time adoption of Amazon EMR within GIR. To date, hundreds of personalization criteria have been created in GIR’s production environment. In terms of web visits and clickthrough rate, site engagement with GIR personalized content has gradually increased since the implementation of the persona-tagging system.

GIR faced a few noteworthy challenges during development, as follows:

Restrictive security group rules

By default, Amazon EMR creates its security groups with rules that are less restrictive, because Amazon EMR can’t anticipate the specific custom settings for ingress and egress rules required by individual use cases. However, proper management of the security group rules is critical to protect the pipeline and data on the cluster. GIR used custom-managed security groups for their EMR cluster nodes and included only the needed security group rules for connectivity, in order to fulfill this stricter security posture.

Custom AMI

There were challenges in ensuring that the required packages were available when using custom Amazon Linux AMIs for Amazon EMR. As part of Goldman Sachs development SDLC controls, any Amazon Elastic Compute Cloud (Amazon EC2) instances on Goldman Sachs-owned AWS accounts are required to use internal Goldman Sachs-created AMIs. When GIR began development, the only compliant AMI that was available under this control was a minimal AMI based on the publicly available Amazon Linux 2 minimal AMI (amzn2-ami-minimal*-x86_64-ebs). However, Amazon EMR recommends using the full default Amazon 2 Linux AMI because it has all the necessary packages pre-installed. This resulted in various start up errors with no clear indication of the missing libraries.

GIR worked with AWS support to identify and resolve the issue by comparing the minimal and full AMIs, and installing the 177 missing packages individually (see the appendix for the full list of packages). In addition, various AMI-related files had been set to read-only permissions by the Goldman Sachs internal AMI creation process. Restoring these permissions to full read/write access allowed GIR to successfully start up their cluster.

Stalled Flink jobs

During GIR’s initial production rollout, GIR experienced an issue where their EMR cluster failed silently and caused their Lambda functions to time out. On further debugging, GIR found this issue to be related to an Akka quarantine-after-silence timeout setting. By default, it was set to 48 hours, causing the clusters to refuse more jobs after that time. GIR found a workaround by setting the value of akka.jvm-exit-on-fatal-error to false in the Flink config file.

Conclusion

In this post, we discussed how the GIR team at Goldman Sachs set up a system using Apache Flink on Amazon EMR to carry out the tagging of users with various personas, in order to better curate content offerings for those users. We also covered some of the challenges that GIR faced with the setup of their EMR cluster. This represents an important first step in providing GIR’s users with complete personalized content curation based on their individual profiles and readership.

Acknowledgments

The authors would like to thank the following members of the AWS and GIR teams for their close collaboration and guidance on this post:

  • Elizabeth Byrnes, Managing Director, GIR
  • Moon Wang, Managing Director, GIR
  • Ankur Gurha, Vice President, GIR
  • Jeremiah O’Connor, Solutions Architect, AWS
  • Ley Nezifort, Associate, GIR
  • Shruthi Venkatraman, Analyst, GIR

About the Authors

Balasubramanian Sakthivel is a Vice President at Goldman Sachs in New York. He has more than 16 years of technology leadership experience and worked on many firmwide entitlement, authentication and personalization projects. Bala drives the Global Investment Research division’s client access and data engineering strategy, including architecture, design and practices to enable the lines of business to make informed decisions and drive value. He is an innovator as well as an expert in developing and delivering large scale distributed software that solves real world problems, with demonstrated success envisioning and implementing a broad range of highly scalable platforms, products and architecture.

Victor Gan is an Analyst at Goldman Sachs in New York. Victor joined the Global Investment Research division in 2020 after graduating from Cornell University, and has been responsible for developing and provisioning cloud infrastructure for GIR’s user entitlement systems. He is focused on learning new technologies and streamlining cloud systems deployments.

Manjula Nagineni is a Solutions Architect with AWS based in New York. She works with major Financial service institutions, architecting, and modernizing their large-scale applications while adopting AWS cloud services. She is passionate about designing big data workloads cloud-natively. She has over 20 years of IT experience in Software Development, Analytics and Architecture across multiple domains such as finance, manufacturing and telecom.

 
 


Appendix

GIR ran the following command to install the missing AMI packages:

yum install -y libevent.x86_64 python2-botocore.noarch \

device-mapper-event-libs.x86_64 bind-license.noarch libwebp.x86_64 \

sgpio.x86_64 rsync.x86_64 perl-podlators.noarch libbasicobjects.x86_64 \

langtable.noarch sssd-client.x86_64 perl-Time-Local.noarch dosfstools.x86_64 \

attr.x86_64 perl-macros.x86_64 hwdata.x86_64 gpm-libs.x86_64 libtirpc.x86_64 \

device-mapper-persistent-data.x86_64 libconfig.x86_64 setserial.x86_64 \

rdate.x86_64 bc.x86_64 amazon-ssm-agent.x86_64 virt-what.x86_64 zip.x86_64 \

lvm2-libs.x86_64 python2-futures.noarch perl-threads.x86_64 \

dmraid-events.x86_64 bridge-utils.x86_64 mdadm.x86_64 ec2-net-utils.noarch \

kbd.x86_64 libtiff.x86_64 perl-File-Path.noarch quota-nls.noarch \

libstoragemgmt-python.noarch man-pages-overrides.x86_64 python2-rsa.noarch \

perl-Pod-Usage.noarch psacct.x86_64 libnl3-cli.x86_64 \

libstoragemgmt-python-clibs.x86_64 tcp_wrappers.x86_64 yum-utils.noarch \

libaio.x86_64 mtr.x86_64 teamd.x86_64 hibagent.noarch perl-PathTools.x86_64 \

libxml2-python.x86_64 dmraid.x86_64 pm-utils.x86_64 \

amazon-linux-extras-yum-plugin.noarch strace.x86_64 bzip2.x86_64 \

perl-libs.x86_64 kbd-legacy.noarch perl-Storable.x86_64 perl-parent.noarch \

bind-utils.x86_64 libverto-libevent.x86_64 ntsysv.x86_64 yum-langpacks.noarch \

libjpeg-turbo.x86_64 plymouth-core-libs.x86_64 perl-threads-shared.x86_64 \

kernel-tools.x86_64 bind-libs-lite.x86_64 screen.x86_64 \

perl-Text-ParseWords.noarch perl-Encode.x86_64 libcollection.x86_64 \

xfsdump.x86_64 perl-Getopt-Long.noarch man-pages.noarch pciutils.x86_64 \

python2-s3transfer.noarch plymouth-scripts.x86_64 device-mapper-event.x86_64 \

json-c.x86_64 pciutils-libs.x86_64 perl-Exporter.noarch libdwarf.x86_64 \

libpath_utils.x86_64 perl.x86_64 libpciaccess.x86_64 hunspell-en-US.noarch \

nfs-utils.x86_64 tcsh.x86_64 libdrm.x86_64 awscli.noarch cryptsetup.x86_64 \

python-colorama.noarch ec2-hibinit-agent.noarch usermode.x86_64 rpcbind.x86_64 \

perl-File-Temp.noarch libnl3.x86_64 generic-logos.noarch python-kitchen.noarch \

words.noarch kbd-misc.noarch python-docutils.noarch hunspell-en.noarch \

dyninst.x86_64 perl-Filter.x86_64 libnfsidmap.x86_64 kpatch-runtime.noarch \

python-simplejson.x86_64 time.x86_64 perl-Pod-Escapes.noarch \

perl-Pod-Perldoc.noarch langtable-data.noarch vim-enhanced.x86_64 \

bind-libs.x86_64 boost-system.x86_64 jbigkit-libs.x86_64 binutils.x86_64 \

wget.x86_64 libdaemon.x86_64 ed.x86_64 at.x86_64 libref_array.x86_64 \

libstoragemgmt.x86_64 libteam.x86_64 hunspell.x86_64 python-daemon.noarch \

dmidecode.x86_64 perl-Time-HiRes.x86_64 blktrace.x86_64 bash-completion.noarch \

lvm2.x86_64 mlocate.x86_64 aws-cfn-bootstrap.noarch plymouth.x86_64 \

parted.x86_64 tcpdump.x86_64 sysstat.x86_64 vim-filesystem.noarch \

lm_sensors-libs.x86_64 hunspell-en-GB.noarch cyrus-sasl-plain.x86_64 \

perl-constant.noarch libini_config.x86_64 python-lockfile.noarch \

perl-Socket.x86_64 nano.x86_64 setuptool.x86_64 traceroute.x86_64 \

unzip.x86_64 perl-Pod-Simple.noarch langtable-python.noarch jansson.x86_64 \

pystache.noarch keyutils.x86_64 acpid.x86_64 perl-Carp.noarch GeoIP.x86_64 \

python2-dateutil.noarch systemtap-runtime.x86_64 scl-utils.x86_64 \

python2-jmespath.noarch quota.x86_64 perl-HTTP-Tiny.noarch ec2-instance-connect.noarch \

vim-common.x86_64 libsss_idmap.x86_64 libsss_nss_idmap.x86_64 \

perl-Scalar-List-Utils.x86_64 gssproxy.x86_64 lsof.x86_64 ethtool.x86_64 \

boost-date-time.x86_64 python-pillow.x86_64 boost-thread.x86_64 yajl.x86_64