Tag Archives: Big Data

Data Engineers of Netflix — Interview with Kevin Wylie

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-kevin-wylie-7fb9113a01ea

Data Engineers of Netflix — Interview with Kevin Wylie

This post is part of our “Data Engineers of Netflix” series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Kevin Wylie is a Data Engineer on the Content Data Science and Engineering team. In this post, Kevin talks about his extensive experience in content analytics at Netflix since joining more than 10 years ago.

Kevin grew up in the Washington, DC area, and received his undergraduate degree in Mathematics from Virginia Tech. Before joining Netflix, he worked at MySpace, helping implement page categorization, pathing analysis, sessionization, and more. In his free time he enjoys gardening and playing sports with his 4 kids.

His favorite TV shows: Ozark, Breaking Bad, Black Mirror, Barry, and Chernobyl

Since I joined Netflix back in 2011, my favorite project has been designing and building the first version of our entertainment knowledge graph. The knowledge graph enabled us to better understand the trends of movies, TV shows, talent, and books. Building the knowledge graph offered many interesting technical challenges such as entity resolution (e.g., are these two movie names in different languages really the same?), and distributed graph algorithms in Spark. After we launched the product, analysts and scientists began surfacing new insights that were previously hidden behind difficult-to-use data. The combination of overcoming technical hurdles and creating new opportunities for analysis was rewarding.

Kevin, what drew you to data engineering?

I stumbled into data engineering rather than making an intentional career move into the field. I started my career as an application developer with basic familiarity with SQL. I was later hired into my first purely data gig where I was able to deepen my knowledge of big data. After that, I joined MySpace back at its peak as a data engineer and got my first taste of data warehousing at internet-scale.

What keeps me engaged and enjoying data engineering is giving super-suits and adrenaline shots to analytics engineers and data scientists.

When I make something complex seem simple, or create a clean environment for my stakeholders to explore, research and test, I empower them to do more impactful business-facing work. I like that data engineering isn’t in the limelight, but instead can help create economies of scale for downstream analytics professionals.

What drew you to Netflix?

My wife came across the Netflix job posting in her effort to keep us in Los Angeles near her twin sister’s family. As a big data engineer, I found that there was an enormous amount of opportunity in the Bay Area, but opportunities were more limited in LA where we were based at the time. So the chance to work at Netflix was exciting because it allowed me to live closer to family, but also provided the kind of data scale that was most common for Bay Area companies.

The company was intriguing to begin with, but I knew nothing of the talent, culture, or leadership’s vision. I had been a happy subscriber of Netflix’s DVD-rental program (no late fees!) for years.

After interviewing, it became clear to me that this company culture was different than any I had experienced.

I was especially intrigued by the trust they put in each employee. Speaking with fellow employees allowed me to get a sense for the kinds of people Netflix hires. The interview panel’s humility, curiosity and business acumen was quite impressive and inspired me to join them.

I was also excited by the prospect of doing analytics on movies and TV shows, which was something I enjoyed exploring outside of work. It seemed fortuitous that the area of analytics that I’d be working in would align so well with my hobbies and interests!

Kevin, you’ve been at Netflix for over 10 years now, which is pretty incredible. Over the course of your time here, how has your role evolved?

When I joined Netflix back in 2011, our content analytics team was just 3 people. We had a small office in Los Angeles focused on content, and significantly more employees at the headquarters in Los Gatos. The company was primarily thought of as a tech company.

At the time, the data engineering team mainly used a data warehouse ETL tool called Ab Initio, and an MPP (Massively Parallel Processing) database for warehousing. Both were appliances located in our own data center. Hadoop was being lightly tested, but only in a few high-scale areas.

Fast forward 10 years, and Netflix is now the leading streaming entertainment service — serving members in over 190 countries. In the data engineering space, very little of the same technology remains. Our data centers are retired, Hadoop has been replaced by Spark, Ab Initio and our MPP database no longer fits our big data ecosystem.

In addition to the company and tech shifting, my role has evolved quite a bit as our company has grown. When we were a smaller company, the ability to span multiple functions was valued for agility and speed of delivery. The sooner we could ingest new data and create dashboards and reports for non-technical users to explore and analyze, the sooner we could deliver results. But now, we have a much more mature business, and many more analytics stakeholders that we serve.

For a few years, I was in a management role, leading a great team of people with diverse backgrounds and skill sets. However, I missed creating data products with my own hands so I wanted to step back into a hands-on engineering role. My boss was gracious enough to let me make this change and focus on impacting the business as an individual contributor.

As I think about my future at Netflix, what motivates me is largely the same as what I’ve always been passionate about. I want to make the lives of data consumers easier and to enable them to be more impactful. As the company scales and as we continue to invest in storytelling, the opportunity grows for me to influence these decisions through better access to information and insights. The biggest impact I can make as a data engineer is creating economies of scale by producing data products that will serve a diverse set of use cases and stakeholders.

If I can build beautifully simple data products for analytics engineers, data scientists, and analysts, we can all get better at Netflix’s goal: entertaining the world.

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering by visiting our jobs site here. Our culture is key to our impact and growth: read about it here. To learn more about our Data Engineers, check out our chats with Dhevi Rajendran and Samuel Setegne.


Data Engineers of Netflix — Interview with Kevin Wylie was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How Netflix uses eBPF flow logs at scale for network insight

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-netflix-uses-ebpf-flow-logs-at-scale-for-network-insight-e3ea997dca96

By Alok Tiagi, Hariharan Ananthakrishnan, Ivan Porto Carrero and Keerti Lakshminarayan

Netflix has developed a network observability sidecar called Flow Exporter that uses eBPF tracepoints to capture TCP flows at near real time. At much less than 1% of CPU and memory on the instance, this highly performant sidecar provides flow data at scale for network insight.

Challenges

The cloud network infrastructure that Netflix utilizes today consists of AWS services such as VPC, DirectConnect, VPC Peering, Transit Gateways, NAT Gateways, etc and Netflix owned devices. Netflix software infrastructure is a large distributed ecosystem that consists of specialized functional tiers that are operated on the AWS and Netflix owned services. While we strive to keep the ecosystem simple, the inherent nature of leveraging a variety of technologies will lead us to challenges such as:

  • App Dependencies and Data Flow Mappings: With the number of micro services growing by the day without understanding and having visibility into an application’s dependencies and data flows, it is difficult for both service owners and centralized teams to identify systemic issues.
  • Pathway Validation: Netflix velocity of change within the production streaming and studio environment can result in the inability of services to communicate with other resources.
  • Service Segmentation: The ease of the cloud deployments has led to the organic growth of multiple AWS accounts, deployment practices, interconnection practices, etc. Without having network visibility, it’s difficult to improve our reliability, security and capacity posture.
  • Network Availability: The expected continued growth of our ecosystem makes it difficult to understand our network bottlenecks and potential limits we may be reaching.

Cloud Network Insight is a suite of solutions that provides both operational and analytical insight into the cloud network infrastructure to address the identified problems. By collecting, accessing and analyzing network data from a variety of sources like VPC Flow Logs, ELB Access Logs, eBPF flow logs on the instances, etc, we can provide network insight to users and central teams through multiple data visualization techniques like Lumen, Atlas, etc.

Flow Exporter

The Flow Exporter is a sidecar that uses eBPF tracepoints to capture TCP flows at near real time on instances that power the Netflix microservices architecture.

What is BPF?

Berkeley Packet Filter (BPF) is an in-kernel execution engine that processes a virtual instruction set, and has been extended as eBPF for providing a safe way to extend kernel functionality. In some ways, eBPF does to the kernel what JavaScript does to websites: it allows all sorts of new applications to be created.

An eBPF flow log record represents one or more network flows that contain TCP/IP statistics that occur within a variable aggregation interval.

The sidecar has been implemented by leveraging the highly performant eBPF along with carefully chosen transport protocols to consume less than 1% of CPU and memory on any instance in our fleet. The choice of transport protocols like GRPC, HTTPS & UDP is runtime dependent on characteristics of the instance placement.

The runtime behavior of the Flow Exporter can be dynamically managed by configuration changes via Fast Properties. The Flow Exporter also publishes various operational metrics to Atlas. These metrics are visualized using Lumen, a self-service dashboarding infrastructure.

So how do we ingest and enrich these flows at scale ?

Flow Collector is a regional service that ingests and enriches flows. IP addresses within the cloud can move from one EC2 instance or Titus container to another over time. We use Sonar to attribute an IP address to a specific application at a particular time. Sonar is an IPv6 and IPv4 address identity tracking service.

Flow Collector consumes two data streams, the IP address change events from Sonar via Kafka and eBPF flow log data from the Flow Exporter sidecars. It performs real time attribution of flow data with application metadata from Sonar. The attributed flows are pushed to Keystone that routes them to the Hive and Druid datastores.

The attributed flow data drives various use cases within Netflix like network monitoring and network usage forecasting available via Lumen dashboards and machine learning based network segmentation. The data is also used by security and other partner teams for insight and incident analysis.

Summary

Providing network insight into the cloud network infrastructure using eBPF flow logs at scale is made possible with eBPF and a highly scalable and efficient flow collection pipeline. After several iterations of the architecture and some tuning, the solution has proven to be able to scale.

We are currently ingesting and enriching billions of eBPF flow logs per hour and providing visibility into our cloud ecosystem. The enriched data allows us to analyze networks across a variety of dimensions (e.g. availability, performance, and security), to ensure applications can effectively deliver their data payload across a globally dispersed cloud-based ecosystem.

Special Thanks To

Brendan Gregg


How Netflix uses eBPF flow logs at scale for network insight was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Engineers of Netflix — Interview with Dhevi Rajendran

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-dhevi-rajendran-a9ab7c7b36e5

Data Engineers of Netflix — Interview with Dhevi Rajendran

Dhevi Rajendran

This post is part of our “Data Engineers of Netflix” interview series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Dhevi Rajendran is a Data Engineer on the Growth Data Science and Engineering team. Dhevi joined Netflix in July 2020 and is one of many Data Engineers who have onboarded remotely during the pandemic. In this post, Dhevi talks about her passion for data engineering and taking on a new role during the pandemic.

Before Netflix, Dhevi was a software engineer at Two Sigma, where she was most recently on a data engineering team responsible for bringing in datasets from a variety of different sources for research and trading purposes. In her free time, she enjoys drawing, doing puzzles, reading, writing, traveling, cooking, and learning new things.

Her favorite TV shows: Atlanta, Barry, Better Call Saul, Breaking Bad, Dark, Fargo, Succession, The Killing

Her favorite movies: Das Leben der Anderen, Good Will Hunting, Intouchables, Mother, Spirited Away, The Dark Knight, The Truman Show, Up

Dhevi, so what got you into data engineering?

While my background has mostly been in backend software engineering, I was most recently doing backend work in the data space prior to Netflix. One great thing about working with data is the impact you can create as an engineer.

At Netflix, the work that data engineers do to produce data in a robust, scalable way is incredibly important to provide the best experience to our members as they interact with our service.

Beyond the really interesting technical challenges that come with working with big data, there are lots of opportunities to think about higher-level domain challenges as a data engineer. In college, I had done human-computer interaction research on subtitles for the Deaf and hard-of-hearing as well as computational genomics research on Alzheimer’s disease. I’ve always enjoyed learning about new areas and combining this knowledge with my technical skills to solve real-world problems.

What drew you to Netflix?

Netflix’s mission and its culture primarily drew me to Netflix. I liked the idea of being a part of a company that brings joy to so many members around the world with an incredibly powerful platform for their stories to be heard. The blend of creativity and a strong engineering culture at Netflix really appealed to me.

The culture was also something that piqued my interest. I was pretty skeptical of Netflix’s culture memo at first. Many companies have lofty ideals that don’t necessarily translate into the reality of the company culture, so I was surprised to see how consistently the culture memo aligns with the actual culture at the company. I’ve found the culture of freedom and responsibility empowering.

Rather than the typical top-down approach many companies use, Netflix trusts each person to make the right decisions for the company by using their deep knowledge of the problems they’re solving along with the context they gather from their leaders and stakeholders.

This means a lot less red tape, a lot less friction, and a lot more freedom for everyone at the company to do what’s best for the business. I also really appreciate the amount of visibility and input we get into broader strategic decisions that the company makes.

Finally, I was also really excited about joining the Growth Data Engineering team! My team is responsible for building data products relating to how we connect with our new members around the world, which is high-impact and has far-reaching global significance. I love that I get to help Netflix connect with new members around the world and help shape the first impression we make on them.

What is your favorite project or a project that you’re particularly proud of?

I have been primarily involved in the payments space. Not a project per se, but one of the things I’ve enjoyed being involved in is the cross-functional meetings with peers and stakeholders who are working in the payments space. These meetings include product managers, designers, consumer insights researchers, software engineers, data scientists, and people in a wide variety of other roles.

I love that I get to work cross-functionally with such a diverse group of people looking at the same set of problems from a variety of unique perspectives.

In addition to my day-to-day technical work, these meetings have provided me with the opportunity to be involved in the high-level product, design, and strategic discussions, which I value. Through these cross-functional efforts, I’ve also really gotten to learn and appreciate the nuances of payments. From using credit cards (which are fairly common in the US but not as widely adopted outside the US) to physically paying in person, members in different countries prefer to pay for our subscription in a wide variety of ways. It’s incredible to see the thoughtful and deeply member-driven approach we use to think about something as seemingly routine, straightforward, and often taken for granted as payments.

What was it like taking on a new role during the pandemic?

First off, I feel very lucky to have found a new role in this very difficult period. With the amount of change and uncertainty, the past year brought, it somehow felt both fitting and imprudent to voluntarily add a career change to the mix. The prospect was daunting at first. I knew there would be a bunch for me to learn coming into Netflix, considering that I hadn’t worked with the technologies my team uses (primarily Scala and Spark). Looking back now, I’m incredibly grateful for the opportunity and glad that I took it. I’ve already learned so much in the past six months and am excited about how much more I can learn and the impact I can make going forward.

Onboarding remotely has been a unique experience as well. Building relationships and gathering broader context are more difficult right now. I’ve found that I’ve learned to be more proactive and actively seek out opportunities to get to know people and the business, whether through setting up coffee chats, reading memos, or attending meetings covering topics I want to learn more about. I still haven’t met anyone I work with in person, but my teammates, my manager, and people across the company have been really helpful throughout the onboarding process.

It’s been incredible to see how gracious people are with their time and knowledge. The amount of empathy and understanding people have shown to each other, including to those who are new to the company, has made taking the leap and joining Netflix a positive experience.

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering here. Our culture is key to our impact and growth: read about it here.


Data Engineers of Netflix — Interview with Dhevi Rajendran was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Data Engineers of Netflix — Interview with Samuel Setegne

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/data-engineers-of-netflix-interview-with-samuel-setegne-f3027f58c2e2

Data Engineers of Netflix — Interview with Samuel Setegne

Samuel Setegne

This post is part of our “Data Engineers of Netflix” interview series, where our very own data engineers talk about their journeys to Data Engineering @ Netflix.

Samuel Setegne is a Senior Software Engineer on the Core Data Science and Engineering team. Samuel and his team build tools and frameworks that support data engineering teams across Netflix. In this post, Samuel talks about his journey from being a clinical researcher to supporting data engineering teams.

Samuel comes from West Philadelphia, and he received his Master’s in Biotechnology from Temple University. Before Netflix, Samuel worked at Travelers Insurance in the Data Science & Engineering space, implementing real-time machine learning models to predict severity and complexity at the onset of property claims.

His favorite TV shows: Bojack Horseman, Marco Polo, and The Witcher

His favorite movies: Scarface, I Am Legend and The Old Guard

Sam, what drew you to data engineering?

Early in my career, I was headed full speed towards life as a clinical researcher. Many healthcare practitioners had strong hunches and wild theories that were exciting to test against an empirical study. I personally loved looking at raw data and using it to understand patterns in the world through technology. However, most challenges that came with my role were domain-related but not as technically demanding. For example — clinical data was often small enough to fit into memory on an average computer and only in rare cases would its computation require any technical ingenuity or massive computing power. There was not enough scope to explore the distributed and large-scale computing challenges that usually come with big data processing. Furthermore, engineering velocity was often sacrificed owing to rigid processes.

Moving into pure Data engineering not only offered me the technical challenges I’ve always craved for but also the opportunity to connect the dots through data which was the best of both worlds.

What is your favorite project or a project you’re particularly proud of?

The very first project I had the opportunity to work on as a Netflix contractor was migrating all of Data Science and Engineering’s Python 2 code to Python 3. This was without a doubt, my favorite project that also opened the door for me to join the organization as a full-time employee. It was thrilling to analyze code from various cross-functional teams and learn different coding patterns and styles.

This kind of exposure opened up opportunities for me to engage with various data engineering teams and advocate for python best practices that helped me drive greater impact at Netflix.

What drew you to Netflix?

What initially caught my attention about a chance to work at Netflix was the variety and quality of content. My family and friends were always ecstatic about having lively and raucous conversations about Netflix shows or movies they recently watched like Marco Polo and Tiger King.

Although other great companies play a role in our daily lives, many of them serve as a kind of utility, whereas Netflix is meant to make us live, laugh, and love by enabling us to experience new voices, cultures, and perspectives.

After I read Netflix’s culture memo, I was completely sold. It precisely described what I always knew was missing in places I’ve worked before. I found the mantra of “people over process” extremely refreshing and eventually learned that it unlocked a bold and creative part of me in my technical designs. For instance, if I feel that a design of an application or a pipeline would benefit from new technology or architecture, I have the freedom to explore and innovate without excessive red tape. Typically in large corporations, you’re tied to strict and redundant processes, causing a lot of fatigue for engineers. When I landed at Netflix, it was a breath of fresh air to learn that we lean into freedom and responsibility and allow engineers to push the boundaries.

Sam, how do you approach building tools/frameworks that can be used across data engineering teams?

My team provides generalized solutions for common and repetitive data engineering tasks. This helps provide “paved path” solutions for data engineering teams and reduces the burden of re-inventing the wheel. When you have many specialized teams composed of highly skilled engineers, the last thing you want for a data engineer is to spend too much time solving small problems that are usually buried inside of the big, broad, and impactful problems. When we extrapolate that to every engineer on every Data Science & Engineering team, it easily adds up and is something worth optimizing.

Any time you have a data engineer spending cycles working on tasks where the data engineering part of their brain is turned off, that’s an opportunity where better tooling can help.

For example, many data engineering teams have to orchestrate notification campaigns when they make changes to critical tables that have downstream dependencies. This is achievable by a Data Engineer but it can be very time-consuming, especially having to track the migration of these downstream users over to your new table or table schema to ensure it’s safe to finalize your changes. This problem was tackled by one of my highly skilled team members who built a centralized migration service that lets Data Engineers easily start “migration campaigns” that can automatically identify downstream users and provide notification and status-tracking capabilities by leveraging Jira. The aim is to enable Data Engineers to quickly fire up one of these campaigns and keep an eye out for its completion while using that extra time to focus on other tasks.

By investing in the right tooling to streamline redundant (yet necessary) tasks, we can drive higher data engineering productivity and efficiency, while accelerating innovation for Netflix.

Learning more

Interested in learning more about data roles at Netflix? You’re in the right place! Keep an eye out for our open roles in Data Science and Engineering by visiting our jobs site here. Our culture is key to our impact and growth: read about it here. Check out our chat with Dhevi Rajendran to know more about starting a new role as a Data Engineer during the pandemic here.


Data Engineers of Netflix — Interview with Samuel Setegne was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Managing complexity in Zabbix installations with Splunk

Post Syndicated from Christian Anton original https://blog.zabbix.com/managing-complexity-in-zabbix-installations-with-splunk/13053/

A big data analytics engine can be used to optimize large and complex Zabbix installations: keeping track of the amount and kind of problems over time, top alert producers, and much more. You can employ Splunk to optimize and analyze vital Zabbix runtime parameters, such as ‘unsupported items,’ repeatedly happening host availability issues, misconfigured agents, and Zabbix Queue entries.

Contents

I. Complexity (1:15)
II. Zabbix entity inventory (8:28)
III. Use cases (15:16)
IV. Conclusion (20:09)
V. Questions & Answers (21:41)

secadm GmbH is a service provider located in the south of Germany. The company with a strong background in monitoring and automation, network infrastructure, and security software development supports customers of all sizes to manage their IT infrastructures. secadm GmbH is a Zabbix partner and also a Zabbix training partner.

Complexity

Operating a Zabbix deployment of a specific size comes with some challenges:

  • A huge number of hosts, templates, items, host groups, macros, and configuration elements inside your Zabbix instance.
  • LLD rules/unsupported items — items that are unable to fetch information, for example, a wrong password or a wrong path, in an external check. It is often hard to get a hold of how many of those you have and in which of the various error states. Therefore, it’s also difficult to fix them.
  • Host availability/network issues — errors that you see only in the logs — things going up and down, losing their connectivity, but getting back in time before issuing an alert.
  • Queue entries. In larger Zabbix installations, you might have ten thousand or even more items in this service queue. Zabbix actually tells you that some items do not receive their data in time. Zabbix shows that something is really wrong, though it doesn’t give a hint about what is wrong.
  • Zabbix as a monitoring tool is there to actually generate problems and alerts out of these problems. Many problems often cause ‘alert fatigue’ when people start ignoring monitoring results because of too many alerts.

Therefore, we receive a lot of questions from our customers, such as:

  • Where do all these problems come from?
  • What are the hosts generating most of the problems, at what times, and generated by what templates?
  • Did the latest change/upgrade have any negative impact on our monitoring?
  • Can you get rid of unsupported items?
  • How many hosts have specific problems (for instance, caused by a known bug in an old version of an agent that behaves strangely with a specific version of the Zabbix server), and what would be the effect if we fixed those problems?
  • Where do all these queue entries come from?

Zabbix is a transparent and predictable monitoring tool that offers great ways to organize the monitored elements with templates and macros. Zabbix also offers excellent visualization capabilities. However, Zabbix is not an analytical utility offering a flexible query language to gather the required information in the required format, having on-demand statistical functions, and allowing you to enrich and correlate data with the data from arbitrary sources. So, extra tools will have to do the extra work.

Secadm GmbH being the partner of Zabbix and Splunk, has concluded that it’s obvious to use Splunk for such extra work. Splunk is offering many possibilities to onboard data in the platform far beyond the simple indexing of log data, looking up the Key-Value store, implementing scripts and programs inside the Splunk platform to fetch data in real-time and on-demand out of other systems without having to store and to index any kind of information, as well as performing custom search commands.

Zabbix entity inventory

The most important Zabbix data used for analysis — the inventory of all elements inside Zabbix that do not often change, such as:

  • Hosts,
  • Items,
  • Proxies,
  • Templates,
  • Triggers,
  • Discovery rules (LLD),
  • Item Prototypes, and
  • Trigger Prototypes.

As this data is not changing constantly, we fetch this data from Splunk with the scheduled search and custom search command directly from the API endpoints in Zabbix. Then we can store this information inside the Splunk KV Store, which is, in fact, the MongoDB allowing us to perform searches in milliseconds without having to index any data and quickly get the results.

Zabbix entity inventory

So, you can get statistics on status and state to drill down on the unsupported items for a list of all of the items. You can further identify the correlation for the hostnames instead of host IDs, which are not human-readable. The hostnames are available at the KV Store, which stores the hosts with their metadata. You can also identify how many unsupported items there are on each host.

You can also get information on the hostnames, hosts, item names, item types, and errors. You can categorize the problems as SNMP problems, shell problems such as wrong paths, and see how often certain problems happen and what hosts are assigned to what templates and host groups, and so on. This information may also be aggregated or correlated with information from UCMDB.

More data

More fun than having data within a data analytics platform has more data.

  • Indexing the Zabbix Server / Proxy Logs logs, categorizing events to identify availability issues, item problems, preprocessing problems, housekeepers statistics, etc.

  • A module to fetch information from Zabbix (item, host, trigger) in real-time.

  • Gathering metrics (History / Trends data) directly from Zabbix in real-time without the need to store these metrics in any place other than the Zabbix database. We can still use the data for graphing, correlations, calculations, etc.

  • Onboarding the Zabbix problems into Splunk by using the new custom Media types — Webhook.

Custom Media type

  • Correlation of the alert logs, which are new and available through the API since Zabbix 5.0.
  • Working on the queue items to solve these questions.

Use cases

Zabbix queue

Zabbix queue may be a real headache as you can wait for a Zabbix installation with 20,000-50,000 items for 5 or 10 minutes or even longer.

In this dashboard, the same view is displayed in Zabbix: items are categorized by overtime, item type, proxy, etc. Splunk here offers what Zabbix fails — the history so that you can see the spikes when things have changed dramatically. For instance, when more significant network changes happen, the network slows down, and the queues grow dramatically. You can see whether these queues have gone back down or remained up. This information is complicated to analyze in Zabbix.

You can also drill down to see the items correlated with their actual status and the host’s status inside Zabbix. So, you can clearly see, for instance, that an item is on the host that is down or in the queue as it’s not supported and doesn’t get any data.

Here, there is also an Ignore list. So you can get statistics for the remaining items and group them, for instance, by Item type. You can go further and analyze and fix the problems.

Zabbix problems analytics

Zabbix problems dashboard

In this dashboard, Zabbix problems are displayed by system categories. For instance, we can see that over the last 24 hours, Windows caused most of the problems.

Here, we can also drill down to see, for instance, if there are many similar problems. You can go further to identify a single issue that has caused many alerts or problems. You can see that one host is creating almost all of the problems. So, if you switch this one host off, you would have fewer problems.

Zabbix data for management visibility

We can use Zabbix data for greater management visibility, such as:

  • Correlation of data to generate meaningful dashboards:

— Zabbix (metrics, status, problems, etc.),
— application logs,
— other data sources,
— inventory (CMDB, …)

  • Business-level visualization.

Conclusion

Splunk is open-source software and is distributed for free. We are currently in the process of integrating Splunk with Zabbix.

If you are interested in Splunk, you can send a request to [email protected]  or look for Christian Anton on LinkedIn or Instagram.

Questions & Answers

Question. If we use this kind of integration, are there any performance issues caused by Splunk or some misconfiguration?

Answer. We have been using Splunk for installations with several tens of thousands of monitored hosts and from hundreds of thousands up to millions of items and have not seen any performance implications.

Question. How does this connector work under the hood? Does it use the API or direct queries to the database?

Answer. We rely on the API. Besides, we can fetch the data directly from the database.

 

Optimizing data warehouse storage

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/optimizing-data-warehouse-storage-7b94a48fdcbe

By Anupom Syam

Background

At Netflix, our current data warehouse contains hundreds of Petabytes of data stored in AWS S3, and each day we ingest and create additional Petabytes. At this scale, we can gain a significant amount of performance and cost benefits by optimizing the storage layout (records, objects, partitions) as the data lands into our warehouse.

There are several benefits of such optimizations like saving on storage, faster query time, cheaper downstream processing, and an increase in developer productivity by removing additional ETLs written only for query performance improvement. On the other hand, these optimizations themselves need to be sufficiently inexpensive to justify their own processing cost over the gains they bring.

We built AutoOptimize to efficiently and transparently optimize the data and metadata storage layout while maximizing their cost and performance benefits.

This article will list some of the use cases of AutoOptimize, discuss the design principles that help enhance efficiency, and present the high-level architecture. Then deep dive into the merging use case of AutoOptimize and share some results and benefits.

Use cases

We found several use cases where a system like AutoOptimize can bring tons of value. Some of the optimizations are prerequisites for a high-performance data warehouse. Sometimes Data Engineers write downstream ETLs on ingested data to optimize the data/metadata layouts to make other ETL processes cheaper and faster. The goal of AutoOptimize is to centralize such optimizations that will remove duplicate work and while doing it more efficiently than vanilla ETLs.

Merge

As the data lands into the data warehouse through real-time data ingestion systems, it comes in different sizes. This results in a perpetually increasing number of small files across the partitions. Merging those numerous smaller files into a handful of larger files can make query processing faster and reduce storage space.

Sort

Presorted records and files in partitions make queries faster and save significant amounts of storage space as it enables a higher level of compression. We already had some existing tables with sorting stages to reduce table storage and improve downstream query performance.

Compaction

Modern data warehouses allow updating and deleting pre-existing records. Iceberg plans to enable this in the form of delta files. Over time, the number of delta files grows, and compacting them to their source files can make the read operations more optimal.

Metadata optimization

In Iceberg, the physical partitioning is decoupled from logical partitioning by keeping a map to file locations in the metadata. This enables us to add additional indexes in the metadata to make point queries more optimal. We can also reorganize the metadata to make file scanning much faster.

Design Principles

For AutoOptimize to efficiently optimize the data layout, we’ve made the following choices:

  1. Just in time vs. periodic optimization
    Only optimize a given data set when required (based on what changed) instead of blind periodic runs.
  2. Essential vs. complete optimization
    Allow users to optimize at the point of diminishing returns instead of a binary setting. For example, we allow a partition to have a few small files instead of always merging files in perfect sizes.
  3. Minimum replacement vs. full overwrite
    Only replace the required minimum amount of files instead of a full sweep overwrite.

These principles reduce resource usage by being more efficient and effective while lowering the end-to-end latency in data processing.

Other than these principles, there are some other design considerations to support and enable:

  • Multi-tenancy with database and table prioritization.
  • Both automatic (event-driven) as well as manual (ad-hoc) optimization.
  • Transparency to end-users.

High-Level Design

AutoOptimize High-Level Design

AutoOptimize is split into 2 subsystems (Service and Actors) to decouple the decisions from the actions at a high level. This decoupling of responsibilities helps us to design, manage, use, and scale the subsystems independently.

AutoOptimize Service

The service is the decision-maker. It decides what to do and when to do in response to an incoming event. It is responsible for listening to incoming events and requests and prioritizing different tables and actions to make the best usage of the available resources.

The work done in the service can be further broken down into the following 3 steps:

Observe: Listen to changes in the warehouse in near real-time. Also, respond to ad-hoc requests created manually by end-users.

Orient: Gather tuning parameters for a particular table that changed. Also, adjust the resource allocation for the table or the number of actors depending on the backlog.

Decide: Determine the highest value action with the right parameters for this particular change and when to act depending on how the action falls in the global priority across all tables and actions.

In AutoOptimize, the service is a cluster of Java (Spring Boot) applications using Redis to keep the states.

AutoOptimize Actors

Actors in AutoOptimize are responsible for the actual work (merging/sorting/compaction etc.). The AutoOptimize Service sends commands to the actors that specify what to do. The job of Actors is to perform those commands in a distributed and fault-tolerant manner.

Actors in AutoOptimize are a pool of long-running Spark jobs managed by the AutoOptimize service.

This was not intentional but we found that the way we modularized AutoOptimize’s decision-making workflow is very similar to the OODA loop and decided to use the same taxonomy.

Other Components

Iceberg
We use Apache Iceberg as the table format. AutoOptimize relies on some of the Iceberg specific features such as snapshot and atomic operations to perform the optimizations in an accurate and scalable manner.

AutoAnalyze
In short, AutoAnalyze finds the best tuning/configuration parameters for a table. It uses “What-If” experiments and previous experiences and heuristics to find the most fitting attributes for a table. We will publish a follow-up blog post about AutoAnalyze in the future. For AutoOptimize, it may find if a table needs file merging or suggest a target file size and other parameters.

Deep Dive into File Merge

File merge is the first use-case that we built for AutoOptimize. Previously we had our homegrown system called Ursula responsible for data ingestion into the Hive based warehouse. The Ursula based pipeline also performed file merges on the ingested table partitions periodically. Since then, we have moved our ingestion to Keystone and our table layout to Iceberg.

The migration out of Ursula to Keystone/Iceberg based ingestion initiated the need for a replacement for Ursula file merge. File merging is necessary for a low latency streaming ingestion pipeline as data often arrive late and unevenly. The number of small files cripples across partitions over time and can have some serious side effects like:

  1. Slowing down queries.
  2. More processing resources.
  3. Increase in storage space.

The goal of File merge in AutoOptimize is to efficiently reduce the side effects while not adding additional latency to the data pipeline.

Solutions

This section will discuss some of the solutions that helped us achieve the previously stated goals.

Just in time optimization

AutoOptimize file merge gets triggered via table change events. This allows AutoOptimize to act right away with a minimum lag. But the problem with being event-driven is it’s expensive to scan the changed partitions every time they change. If we can determine “how noisy” a partition is from the changesets in a rolling manner, we will eliminate unnecessary full partition scanning with early signals from snapshots.

Essential work

After a full partition scan, AutoOptimize gets a more comprehensive view of the state of the partition. We can get a more accurate state of the partition at this stage and avoid non-essential work.

Partition Entropy
We introduced a concept called Partition Entropy (PE) used for early pruning at each step to reduce actual work. It’s a set of stats about the state of the partition. We calculate this in a rolling manner after each snapshot scan and more exhaustively after each partition scan.

The parts of PE that deal with file sizes are called File Size Entropy (FSE). FSE of a partition is derived from the Mean Squared Error (MSE) of file sizes in a partition. We will use the terms FSE and MSE interchangeably.

We use the standard Mean Squared Error formula:

Where,

N = Number of files in the partition
Target = Target File Size
Actual = min(Actual File Size, Target)

When a partition is scanned, it’s easy to calculate the MSE using the above formula as we know the sizes of all files in that partition. We store the MSE and N for each partition in Redis for later use.

At the snapshot scan stage, we get a commit definition containing the list of files and their metadata (like size, number of records, etc.) that got added and deleted in the commit. We calculate the new MSE’ of a changed partition in a rolling manner from the snapshot information and the previously stored stats using this formula:

Where,

M = Number of files added in the snapshot.
Target = Target File Size.
Actual = min(Actual File Size, Target)
N = Previously stored number of files in the partition.
MSE = Previously stored MSE.

We have a tolerance threshold (T) for each partition and skip further processing of the partition if MSE < T². This helps us significantly reduce the number of full partition scans at the snapshot scan step and the number of actual merges in the partition scan stage.

Entropy-Based Filtering

The actual formulas are a little bit more complicated than what stated here, as we need to take care of deleted files and some other edge cases. We could also use Mean Absolute Error but we want to be biased towards outliers — as the goal is to have a more even file size in a partition than having a mixed bag of different sizes with some perfect sized files.

Minimum replacement

Once we start processing a partition, we find the minimum amount of work needed to reduce the File Size Entropy and thus reduce the number of small files.

We use 2 different packing algorithms to achieve this:

Knuth/Plass line breaking algorithm
We use this strategy when the sort order among files is important. With a correct error function (ex: Error²), this algorithm helps minimize MSE with a bounding run time of O(n²).

First Fit Decreasing bin packing algorithm
We use a modified version of the original FFD algorithm if we can ignore the sort order. This helps reduce the number of replacements with an O(nlog(n)) running time.

These methods help us smooth out the file size histogram while doing it optimally with minimal file replacement.

Multi-tenancy

AutoOptimize is multi-tenant; that is, it runs on many different databases and tables. When running the optimizations, it also needs to prioritize and allocate resources at different levels for different tasks. It requires answering questions like which table should be processed first or get more resource bandwidth or what optimization gives the most ROI.

To support multi-tenancy and tasks prioritization, it needs to have the following properties:

  • Weighted resource sharing across different priorities.
  • Fair resource sharing across different tables and tasks with the same priority.
  • Handle bursts to prevent starvation.

We use different types of Weighted Fair Queue implementations inside AutoOptimize, including different combinations of the followings:

  1. Weighted Round Robin
  2. Deficit Weighted Round Robin
  3. Fixed Priority Preemptive

Reliable Priority Queue
To support prioritization and fair resource usage, we introduced a concept called Reliable Priority Queue (RPQ) in AutoOptimize. A reliable queue does not lose items if the subscriber fails to process the items after a dequeue. An RPQ also has a sense of prioritization across different items while being reliable. The concept is fairly similar to the default Redis RPOPLPUSH reliable queue pattern. But for AutoOptimize’s use case, we use Sorted Sets instead of lists to enable prioritization.

The goal of AutoOptimize is to optimize the warehouse with a holistic perspective. Making it multi-tenant with a notion of different priorities helps us make the most optimal resource allocation.

Results

22% reduction in partition scans

2% reduction in merge actions

72% reduction in file replacements

These savings are stacked on top of each other as they are applied in sequence in the AutoOptimize pipeline. This results in a massive reduction in actual processing need while reducing the number of files by 80%.

80% reduction in the number of files

70% saving in compute

We are using 70% less compute instances than our previous merge implementation.

We also see up to 60% improvement in query performance and an additional 1% saving in storage.

Benefits

Increase processing efficiency: As AutoOptimize uses file replacement and can avoid processing by filtering early, it can save processing costs by skipping files that are not required to be merged.

Increase storage efficiency: AutoOptimize helps save storage costs by enabling AutoAnalyze recommendations to sort the records.

Reduce lag: Periodic overwrite ETLs take more time as it works in batches. AutoOptimize reduces end to end lag in data processing by optimizing as we go.

Faster query: A smaller number of files results in smaller file scanning, fewer network calls, and makes queries faster.

Ease of use: AutoOptimize provides a frictionless way to setup optimization with minimum maintenance overhead from Data Engineering.

Developer productivity: Instead of adding an ETL per table for merging, which adds ongoing incremental maintenance cost, we have a single solution that can transparently scale to many tables.

Conclusion

We believe the problems we faced at Netflix are not unique, and some of the techniques and design considerations we made can be applied more generally. By laying out the data intelligently as they are ingested into the warehouse, we are removing complexities for Data Engineers and accelerating the end-to-end pipeline. At the same time, we are gaining a significant amount of performance and cost improvement by optimizing only when it makes sense. We plan to extend AutoOptimize into other use cases and integrate it more with the Iceberg ecosystem in the future.


Optimizing data warehouse storage was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Bulk vs Individual Compression

Post Syndicated from Bozho original https://techblog.bozho.net/bulk-vs-individual-compression/

I’d like to share something very brief and very obvious – that compression works better with large amounts of data. That is, if you have to compress 100 sentences you’d better compress them in bulk rather than once sentence at a time. Let me illustrate that:

public static void main(String[] args) throws Exception {
    List<String> sentences = new ArrayList<>();
    for (int i = 0; i < 100; i ++) {
        StringBuilder sentence = new StringBuilder();
        for (int j = 0; j < 100; j ++) { 
          sentence.append(RandomStringUtils.randomAlphabetic(10)).append(" "); 
        } 
        sentences.add(sentence.toString()); 
    } 
    byte[] compressed = compress(StringUtils.join(sentences, ". ")); 
    System.out.println(compressed.length); 
    System.out.println(sentences.stream().collect(Collectors.summingInt(sentence -> compress(sentence).length)));
}

The compress method is using commons-compress to easily generate results for multiple compression algorithms:

public static byte[] compress(String str) {
   if (str == null || str.length() == 0) {
       return new byte[0];
   }
   ByteArrayOutputStream out = new ByteArrayOutputStream();
   try (CompressorOutputStream gzip = new CompressorStreamFactory()
           .createCompressorOutputStream(CompressorStreamFactory.GZIP, out)) {
       gzip.write(str.getBytes("UTF-8"));
       gzip.close();
       return out.toByteArray();
   } catch (Exception ex) {
       throw new RuntimeException(ex);
   }
}

The results are as follows, in bytes (note that there’s some randomness, so algorithms are not directly comparable):

Algorithm Bulk Individual
GZIP 6590 10596
LZ4_FRAMED 9214 10900
BZIP2 6663 12451

Why is that an obvious result? Because of the way most compression algorithms work – they look for patterns in the raw data and create a map of those patterns (a very rough description).

How is that useful? In big data scenarios where the underlying store supports per-record compression (e.g. a database or search engine), you may save a significant amount of disk space if you bundle multiple records into one stored/indexed record.

This is not a generically useful advice, though. You should check the particular datastore implementation. For example MS SQL Server supports both row and page compression. Cassandra does compression on an SSTable level, so it may not matter how you structure your rows. Certainly, if storing data in files, storing it in one file and compressing it is more efficient than compressing multiple files separately.

Disk space is cheap so playing with data bundling and compression may be seen as premature optimization. But in systems that operate on large datasets it’s a decision that can save you a lot of storage costs.

The post Bulk vs Individual Compression appeared first on Bozho's tech blog.

Running a high-performance SAS Grid Manager cluster on AWS with Amazon FSx for Lustre

Post Syndicated from Neelam original https://aws.amazon.com/blogs/big-data/running-a-high-performance-sas-grid-manager-cluster-on-aws-with-amazon-fsx-for-lustre/

SAS® is a software provider of data science and analytics used by enterprises and government organizations. SAS Grid is a highly available, fast processing analytics platform that offers centralized management that balances workloads across different compute nodes. This application suite is capable of data management, visual analytics, governance and security, forecasting and text mining, statistical analysis, and environment management. SAS and AWS recently performed testing using the Amazon FSx for Lustre shared file system to determine how well a standard workload performs on AWS using SAS Grid Manager. For more information about the results, see the whitepaper Accelerating SAS Using High-Performing File Systems on Amazon Web Services.

In this post, we take a look at an approach to deploy underlying AWS infrastructure to run SAS Grid with FSx for Lustre that you can also apply to similar applications with demanding I/O requirements.

System design overview

Running high-performance workloads that use throughput heavily, with sensitivity to network latency, requires approaches outside of typical applications. AWS generally recommends that applications span multiple Availability Zones for high availability. In the case of latency sensitivity, high throughput applications traffic should be local for optimal performance. To maximize throughput, you can do the following:

  • Run in a virtual private cloud (VPC), using instance types that support enhanced networking
  • Run instances in the same Availability Zone
  • Run instances within a placement group

The following diagram illustrates the SAS Grid with FSx for Lustre architecture on AWS.

SAS Grid architecture consists of mid-tier nodes, metadata servers, and Grid compute nodes. The mid-tier nodes are responsible for running the Platform Web Services (PWS) and Load Sharing Facility (LSF) components. These components dispatch jobs submitted and return the status of each job.

To effectively run PWS and LSF on mid-tier nodes, you need Amazon Elastic Compute Cloud (Amazon EC2) instances with high memory. For this use case, the r5 instance family would meet this requirement.

Metadata servers contain the metadata repository that stores the metadata definitions of all SAS Grid manager products, which the r5 instance family can also serve effectively. We recommend either meeting or exceeding the recommended memory requirement of 24 GB of RAM or 8 MB per physical core (whichever is larger). Metadata servers don’t need compute-intensive resources or high I/O bandwidth; therefore, you can choose the r5 instance family for a balance of price and performance.

SAS Grid nodes are responsible for executing the jobs received by the grid, and EC2 instances capable of handling these jobs depend on the size, complexity, and volume of the work the grid performs. To meet the minimum requirements of SAS Grid workloads, we recommend having a minimum of 8 GB of physical RAM per core and a robust I/O throughput of 100–125 MB/second per physical core. For this use case, EC2 instance families of m5n and r5n suffice in meeting the RAM and throughput requirements. You can host SASDATA, SASWORK, and UTILLOC libraries in a shared file system. If you choose to offload SASWORK to instance storage, the i3en instance family meets this need because they support instance storage over 1.2 TB. In the next section, we take a look at how throughput testing was performed to arrive at the EC2 instance recommendations with FSx for Lustre.

Steps to maximize storage I/O performance

SAS Grid requires a shared file system, and we wanted to benchmark the performance of FSx for Lustre as the chosen shared file system against various EC2 instance families that meet the minimum requirements of 8 GB of physical RAM per core and 100–125 MB/second throughput per physical core.

FSx for Lustre is a fully managed file storage service designed for applications that require fast storage. As a POSIX-compliant file system, you can use FSx for Lustre with current Linux-based applications without having to make any changes. Although FSx for Lustre offers a choice between scratch and persistent type file systems, we recommend for SAS Grid to use persistent type FSx for Lustre file system because you need to store the SASWORK, SASDATA, and UTILLOC data and libraries for longer periods with high availability and data durability. To meet I/O throughput, make sure to select the appropriate storage capacity for throughput per unit of storage to achieve the desired range of 100–125 MB/second.

After setting up the file system, we recommend mounting FSx for Lustre with the flock mount option. The following code example is a mount command and mount option for FSx for Lustre:

$ sudo mount -t lustre -o noatime,flock fs-0123456789abcd.fsx.us-west- [email protected]:/za3atbmv /fsx
$ mount -t lustre
[email protected]:/za3atbmv on /fsx type lustre

(rw,noatime,seclabel,flock,lazystatfs)

Throughput testing and results

To select the best-placed EC2 instances for running SAS Grid with FSx for Lustre, we ran a series of highly parallel network throughput tests from individual EC2 instances against a 100.8 TiB persistent file system that had an aggregate throughput capacity of 19.688 GB/second. We ran these tests in multiple regions using multiple EC2 instance families (c5, c5n, i3, i3en, m5, m5a, m5ad, m5n, m5dn, r5, r5a, r5ad, r5n, and r5dn). The tests ran for 3 hours for each instance, and the DataWriteBytes metric of the file system was recorded every 1 minute. Only one instance was accessing the file system at a time, and the p99.9 results were captured. The metrics were consistent across all four Regions.

We observed that the i3en, m5n, m5dn, r5n, and r5dn EC2 instance families meet or exceed the minimum network performance and memory recommendations. For more information about the performance results, see the whitepaper Accelerating SAS Using High-Performing File Systems on Amazon Web Services. The i3 instance family is just shy of meeting the minimum network performance. If you want to use the instance storage for SASWORK and UTILLOC libraries, you can consider i3en instances.

M5n and r5n are a good blend of price and performance, and we recommend the m5n instance family for SAS Grid nodes. However, if your workload is memory bound, consider using r5n instances, which provide higher memory per physical core for a higher price point than m5n instances.

We also ran rhel_iotest.sh, which is available from the SAS technical support samples tool repository (SASTSST), using the same FSx for Lustre configuration as mentioned earlier. The following table shows the read and write performance per physical core for a variety of instances sizes in the m5n and r5n families.

Instance Type

Variable Network Performance Peak per Physical Core
Read (MB/second) Write (MB/second)
m5n.large 850.20 357.07
m5n.xlarge 519.46 386.25
m5n.2xlarge 283.01 446.84
m5n.4xlarge 202.89 376.57
m5n.8xlarge 154.98 297.71
r5n.large 906.88 429.93
r5n.xlarge 488.36 455.76
r5n.2xlarge 256.96 471.65
r5n.4xlarge 203.31 390.03
r5n.8xlarge 149.63 299.45

To take advantage of the elasticity, scalability, and flexibility of the cloud, we recommend spreading the SAS Grid and compute workload over a larger number of smaller instances versus using a smaller number of larger instances. For mid-tier, use a minimum of two instances, and for metadata servers, we recommend a minimum of three instances for the SAS Grid architecture.

Conclusions

Before FSx for Lustre file system, you either had to use Amazon Elastic File System (Amazon EFS) or a third-party file system from AWS Marketplace and Amazon Elastic Block Store (Amazon EBS) for the SASWORK, SASDATA, and UTILLOC libraries and storage data. Each storage option came with its own settings and limitations, which caused loss in performance. With FSx for Lustre, you have a single solution for all SAS Grid storage requirements, which allows you to focus on running your business instead of maintaining a file system. We recommend that SAS admin deploy SAS Grid with m5n and r5n instances for SAS Grid compute nodes when accessing FSx for Lustre file system.

If you have questions or suggestions, please leave a comment.

Hyper Scale VPC Flow Logs enrichment to provide Network Insight

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/hyper-scale-vpc-flow-logs-enrichment-to-provide-network-insight-e5f1db02910d

How Netflix is able to enrich VPC Flow Logs at Hyper Scale to provide Network Insight

By Hariharan Ananthakrishnan and Angela Ho

The Cloud Network Infrastructure that Netflix utilizes today is a large distributed ecosystem that consists of specialized functional tiers and services such as DirectConnect, VPC Peering, Transit Gateways, NAT Gateways, etc. While we strive to keep the ecosystem simple, the inherent nature of leveraging a variety of technologies will lead us to complications and challenges such as:

  • App Dependencies and Data Flow Mappings: Without understanding and having visibility into an application’s dependencies and data flows, it is difficult for both service owners and centralized teams to identify systemic issues.
  • Pathway Validation: Netflix velocity of change within the production streaming environment can result in the inability of services to communicate with other resources.
  • Service Segmentation: The ease of the cloud deployments has led to the organic growth of multiple AWS accounts, deployment practices, interconnection practices, etc. Without having network visibility, it’s not possible to improve our reliability, security and capacity posture.
  • Network Availability: The expected continued growth of our ecosystem makes it difficult to understand our network bottlenecks and potential limits we may be reaching.

Cloud Network Insight is a suite of solutions that provides both operational and analytical insight into the Cloud Network Infrastructure to address the identified problems. By collecting, accessing and analyzing network data from a variety of sources like VPC Flow Logs, ELB Access Logs, Custom Exporter Agents, etc, we can provide Network Insight to users through multiple data visualization techniques like Lumen, Atlas, etc.

VPC Flow Logs

VPC Flow Logs is an AWS feature that captures information about the IP traffic going to and from network interfaces in a VPC. At Netflix we publish the Flow Log data to Amazon S3. Flow Logs are enabled tactically on either a VPC or subnet or network interface. A flow log record represents a network flow in the VPC. By default, each record captures a network internet protocol (IP) traffic flow (characterized by a 5-tuple on a per network interface basis) that occurs within an aggregation interval.

version vpc-id subnet-id instance-id interface-id account-id type srcaddr dstaddr srcport dstport pkt-srcaddr pkt-dstaddr protocol bytes packets start end action tcp-flags log-status
3 vpc-12345678 subnet-012345678 i-07890123456 eni-23456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK

The IP addresses within the Cloud can move from one EC2 instance or Titus container to another over time. To understand the attributes of each IP back to an application metadata Netflix uses Sonar. Sonar is an IPv4 and IPv6 address identity tracking service. VPC Flow Logs are enriched using IP Metadata from Sonar as it is ingested.

With a large ecosystem at Netflix, we receive hundreds of thousands of VPC Flow Log files in S3 each hour. And in order to gain visibility into these logs, we need to somehow ingest and enrich this data.

So how do we ingest all these s3 files?

At Netflix, we have the option to use Spark as our distributed computing platform. It is easier to tune a large Spark job for a consistent volume of data. As you may know, S3 can emit messages when events (such as a file creation events) occur which can be directed into an AWS SQS queue. In addition to the s3 object path, these events also conveniently include file size which allows us to intelligently decide how many messages to grab from the SQS queue and when to stop. What we get is a group of messages representing a set of s3 files which we humorously call “Mouthfuls”. In other words, we are able to ensure that our Spark app does not “eat” more data than it was tuned to handle.

We named this library Sqooby. It works well for other pipelines that have thousands of files landing in s3 per day. But how does it hold up to the likes of Netflix VPC Flow Logs that has volumes which are orders of magnitude greater? It didn’t. The primary limitation was that AWS SQS queues have a limit of 120 thousand in-flight messages. We found ourselves needing to hold more than 120 thousand messages in flight at a time in order to keep up with the volumes of files.

Requirements

There are multiple ways you can solve this problem and many technologies to choose from. As with any sustainable engineering design, focusing on simplicity is very important. This means using existing infrastructure and established patterns within the Netflix ecosystem as much as possible and minimizing the introduction of new technologies.

Equally important is the resilience, recoverability, and supportability of the solution. A malformed file should not hold up or back up the pipeline (resilience). If unexpected environmental factors cause the pipeline to get backed up, it should be able to recover by itself. And excellent logging is needed for debugging purposes and supportability. These characteristics allow for an on-call response time that is relaxed and more in line with traditional big data analytical pipelines.

Hyper Scale

At Netflix, our culture gives us the freedom to decide how we solve problems as well as the responsibility of maintaining our solutions so that we may choose wisely. So how did we solve this scale problem that meets all of the above requirements? By applying existing established patterns in our ecosystem on top of Sqooby. In this case, it’s a pattern which generates events (directed into another AWS SQS queue) whenever data lands in a table in a datastore. These events represent a specific cut of data from the table.

We applied this pattern to the Sqooby log tables which contained information about s3 files for each Mouthful. What we got were events that represented Mouthfuls. Spark could look up and retrieve the data in the s3 files that the Mouthful represented. This intermediate step of persisting Mouthfuls allowed us to easily “eat” through S3 event SQS messages at great speed, converting them to far fewer Mouthful SQS Messages which would each be consumed by a single Spark app instance. Because we ensured that our ingestion pipeline could concurrently write/append to the final VPC Flow Log table, this meant that we could scale out the number of Spark app instances we spin up.

Tuning for Hyper Scale

On this journey of ingesting VPC flow logs, we found ourselves tweaking configurations in order to tune throughput of the pipeline. We modified the size of each Mouthful and tuned the number of Spark executors per Spark app while being mindful of cluster capacity. We also adjusted the frequency in which Spark app instances are spun up such that any backlog would burn off during a trough in traffic.

Summary

Providing Network Insight into the Cloud Network Infrastructure using VPC Flow Logs at hyper scale is made possible with the Sqooby architecture. After several iterations of this architecture and some tuning, Sqooby has proven to be able to scale.

We are currently ingesting and enriching hundreds of thousands of VPC Flow Logs S3 files per hour and providing visibility into our cloud ecosystem. The enriched data allows us to analyze networks across a variety of dimensions (e.g. availability, performance, and security), to ensure applications can effectively deliver their data payload across a globally dispersed cloud-based ecosystem.

Special Thanks To

Bryan Keller, Ryan Blue


Hyper Scale VPC Flow Logs enrichment to provide Network Insight was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

AWS Architecture Monthly Magazine: Data Lakes

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/aws-architecture-monthly-magazine-data-lakes/

A data lake is the fastest way to get answers from all your data to all your users. It’s a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning—to guide better decisions.

In This Issue

In this month’s Architecture Monthly, we speak to AWS Analytics Tech Leader, Taz Sayed, about general architecture trends in data lakes, the questions customers need to ask themselves before considering a data lake, and we get his outlook on the role the cloud will play in future development efforts.

We also introduce you to two companies that are utilizing data lakes for deep analytics, point you to an AWS managed solution, provide some real-world videos, and more.

  • Ask an Expert: Taz Sayed, Tech Leader, AWS Analytics
  • Blog: Kayo Sports builds real-time view of the customer on AWS
  • Case Study: Yulu Uses a Data Lake on AWS to Pedal a Change
  • Solution: Data Lake on AWS
  • Managed Solution: AWS Lake Formation
  • Whitepaper: Building Big Data Storage Solutions (Data Lakes) for Maximum Flexibility

How to Access the Magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle Newsstand page or contact us anytime at [email protected].

Delta: A Data Synchronization and Enrichment Platform

Post Syndicated from Netflix Technology Blog original https://medium.com/netflix-techblog/delta-a-data-synchronization-and-enrichment-platform-e82c36a79aee?source=rss----2615bd06b42e---4

Part I: Overview

Andreas Andreakis, Falguni Jhaveri, Ioannis Papapanagiotou, Mark Cho, Poorna Reddy, Tongliang Liu

Overview

It is a commonly observed pattern for applications to utilize multiple datastores where each is used to serve a specific need such as storing the canonical form of data (MySQL etc.), providing advanced search capabilities (ElasticSearch etc.), caching (Memcached etc.), and more. Typically when using multiple datastores, one of them acts as the primary store, and the others as derived stores. Now the challenge becomes how to keep these datastores in sync.

We have observed a series of distinct patterns which have tried to address multi-datastore synchronization, such as dual writes, distributed transactions, etc. However, these approaches have limitations in regards to feasibility, robustness, and maintenance. Beyond data synchronization, some applications also need to enrich their data by calling external services.

To address these challenges, we developed Delta. Delta is an eventual consistent, event driven, data synchronization and enrichment platform.

Existing Solutions

Dual Writes

In order to keep two datastores in sync, one could perform a dual write, which is executing a write to one datastore following a second write to the other. The first write can be retried, and the second can be aborted should the first fail after exhausting retries. However, the two datastores can get out of sync if the write to the second datastore fails. A common solution is to build a repair routine, which can periodically re-apply data from the first to the second store, or does so only if differences are detected.

Issues:
Implementing the repair routine typically is tailored work which may not be reusable. Also, data between the stores remain out of sync until the repair routine is applied. The solution can become increasingly complicated if more than two datastores are involved. Finally, the repair routine can add substantial stress to the primary data source during its activity.

Change Log Table

When mutations (like an insert, update and delete) occur on a set of tables, entries for the changes are added to the log table as part of the same transaction. Another thread or process is constantly polling events from the log table and writes them to one or multiple datastores, optionally removing events from the log table after acknowledged by all datastores.

Issues:
This needs to be implemented as a library and ideally without requiring code changes for the application using it. In a polyglot environment this library implementation needs to be repeated for each supported language and it is challenging to ensure consistent features and behavior across languages.

Another issue exists for the capture of schema changes, where some systems, like MySQL, don’t support transactional schema changes [1][2]. Therefore, the pattern to execute a change (like a schema change) and to transactionally write it to the change log table does not always work.

Distributed Transactions

Distributed transactions can be used to span a transaction across multiple heterogeneous datastores so that a write operation is either committed to all involved stores or to none.

Issues:
Distributed transactions have proven to be problematic across heterogeneous datastores. By their nature, they can only rely on the lowest common denominator of participating systems. For example, XA transactions block execution if the application process fails during the prepare phase; moreover, XA provides no deadlock detection and no support for optimistic concurrency-control schemes. Also, certain systems like ElasticSearch, do not support XA or any other heterogeneous transaction model. Thus, ensuring the atomicity of writes across different storage technologies remains a challenging problem for applications [3].

Delta

Delta has been developed to address the limitations of existing solutions for data synchronization, and also allows to enrich data on the fly. Our goal was to abstract those complexities from application developers so they can focus on implementing business features. In the following, we are describing “Movie Search”, an actual use case within Netflix that leverages Delta.

In Netflix the microservice architecture is widely adopted and each microservice typically handles only one type of data. The core movie data resides in a microservice called Movie Service, and related data such as movie deals, talents, vendors and so on are managed by multiple other microservices (e.g Deal Service, Talent Service and Vendor Service). Business users in Netflix Studios often need to search by various criteria for movies in order to keep track of productions, therefore, it is crucial for them to be able to search across all data that are related to movies.

Prior to Delta, the movie search team had to fetch data from multiple other microservices before indexing the movie data. Moreover, the team had to build a system that periodically updated their search index by querying others for changes, even if there was no change at all. That system quickly grew very complex and became difficult to maintain.

Figure 1. Polling System Prior to Delta

After on-boarding to Delta, the system is simplified into an event driven system, as depicted in the following diagram. CDC (Change-Data-Capture) events are sent by the Delta-Connector to a Keystone Kafka topic. A Delta application built using the Delta Stream Processing Framework consumes the CDC events from the topic, enriches each of them by calling other microservices, and finally sinks the enriched data to the search index in Elasticsearch. The whole process is nearly real-time, meaning as soon as the changes are committed to the datastore, the search indexes are updated.

Figure 2. Data Pipeline using Delta

In the following sections, we are going to describe the Delta-Connector that connects to a datastore and publishes CDC events to the Transport Layer, which is a real-time data transportation infrastructure routing CDC events to Kafka topics. And lastly we are going to describe the Delta Stream Processing Framework that application developers can use to build their data processing and enrichment logics.

CDC (Change-Data-Capture)

We have developed a CDC service named Delta-Connector, which is able to capture committed changes from a datastore in real-time and write them to a stream. Real-time changes are captured from the datastore’s transaction log and dumps. Dumps are taken because transaction logs typically do not contain the full history of changes. Changes are commonly serialized as Delta events so that a consumer does not need to be concerned if a change originates from the transaction log or a dump.

Delta-Connector offers multiple advanced features such as:

  • Ability to write into custom outputs beyond Kafka.
  • Ability to trigger manual dumps at any time, for all tables, a specific table, or for specific primary keys.
  • Dumps can be taken in chunks, so that there is no need to repeat from scratch in case of failure.
  • No need to acquire locks on tables, which is essential to ensure that the write traffic on the database is never blocked by our service.
  • High availability, via standby instances across AWS Availability Zones.

We currently support MySQL and Postgres, including when deployed in AWS RDS and its Aurora flavor. In addition, we support Cassandra (multi-master). We will cover the Delta-Connector in more detail in upcoming blog posts.

Kafka & Transport Layer

The transport layer of Delta events were built on top of the Messaging Service in our Keystone platform.

Historically, message publishing at Netflix is optimized for availability instead of durability (see a previous blog). The tradeoff is potential broker data inconsistencies in various edge scenarios. For example, unclean leader election will result in consumer to potentially duplicate or lose events.

For Delta, we want stronger durability guarantees in order to make sure CDC events can be guaranteed to arrive to derived stores. To enable this, we offered special purpose built Kafka cluster as a first class citizen. Some broker configuration looks like below.

In Keystone Kafka clusters, unclean leader election is usually enabled to favor producer availability. This can result in messages being lost when an out-of-sync replica is elected as a leader. For the new high durability Kafka cluster, unclean leader election is disabled to prevent these messages getting lost.

We’ve also increased the replication factor from 2 to 3 and the minimum insync replicas from 1 to 2. Producers writing to this cluster require acks from all, to guarantee that 2 out of 3 replicas have the latest messages that were written by the producers.

When a broker instance gets terminated, a new instance replaces the terminated broker. However, this new broker will need to catch up on out-of-sync replicas, which may take hours. To improve the recovery time for this scenario, we started using block storage volumes (Amazon Elastic Block Store) instead of local disks on the brokers. When a new instance replaces the terminated broker, it now attaches the EBS volume that the terminated instance had and starts catching up on new messages. This process reduces the catch up time from hours to minutes since the new instance no longer have to replicate from a blank state. In general, the separate life cycles of storage and broker greatly reduce the impact of broker replacement.

To further maximize our delivery guarantee, we used the message tracing system to detect any message loss due to extreme conditions (e.g clock drift on the partition leader).

Stream Processing Framework

The processing layer of Delta is built on top of Netflix SPaaS platform, which provides Apache Flink integration with the Netflix ecosystem. The platform provides a self-service UI which manages Flink job deployments and Flink cluster orchestration on top of our container management platform Titus. The self-service UI also manages job configurations and allows users to make dynamic configuration changes without having to recompile the Flink job.

Delta provides a stream processing framework on top of Flink and SPaaS that uses an annotation driven DSL (Domain Specific Language) to abstract technical details further away. For example, to define a step that enriches events by calling external services, users only need to write the following DSL and the framework will translate it into a model which is executed by Flink.

Figure 3. Enrichment DSL Example in a Delta Application

The processing framework not only reduces the learning curve, but also provides common stream processing functionalities like deduplication, schematization, as well as resilience and fault tolerance to address general operational concerns.

Delta Stream Processing Framework consists of two key modules, the DSL & API module and Runtime module. The DSL & API module provides the annotation based DSL and UDF (User-Defined-Function) APIs for users to write custom processing logic (e.g filter and transformation). The Runtime module provides DSL parser implementation that builds an internal representation of the processing steps in DAG models. The Execution component interprets the DAG models to initialize the actual Flink operators and eventually run the Flink app. The architecture of the framework is illustrated in the following Chart.

Figure 4. Delta Stream Processing Framework Architecture

This approach has several benefits:

  • Users can focus on their business logic without the need of learning the specifics of Flink or the SPaaS framework.
  • Optimization can be made in a way that is transparent to users, and bugs can be fixed without requiring any changes to user code (UDFs).
  • Operating Delta applications is made simple for users as the framework provides resilience and failure tolerance out of the box and collects many granular metrics that can be used for alerts.

Production Usages

Delta has been running in production for over a year and has been playing a crucial role in many Netflix Studio applications. It has helped teams implement use cases such as search indexing, data warehousing, and event driven workflows. Below is a view of the high level architecture of the Delta platform.

Figure 5. High Level Architecture of Delta

Stay Tuned

We will publish follow-up blogs about technical details of the key components such as Delta-Connector and Delta Stream Processing Framework. Please stay tuned. Also feel free to reach out to the authors for any questions you may have.

Credits

We would like to thank the following persons that have been involved in making Delta successful at Netflix: Allen Wang, Charles Zhao, Jaebin Yoon, Josh Snyder, Kasturi Chatterjee, Mark Cho, Olof Johansson, Piyush Goyal, Prashanth Ramdas, Raghuram Onti Srinivasan, Sandeep Gupta, Steven Wu, Tharanga Gamaethige, Yun Wang, and Zhenzhong Xu.

References

  1. https://dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
  2. https://dev.mysql.com/doc/refman/5.7/en/cannot-roll-back.html
  3. Martin Kleppmann, Alastair R. Beresford, and Boerge Svingen. 2019. Online Event Processing. Queue 17, 1, pages 40 (February 2019), 21 pages. DOI: https://doi.org/10.1145/3317287.3321612


Delta: A Data Synchronization and Enrichment Platform was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and…

Post Syndicated from Netflix Technology Blog original https://medium.com/netflix-techblog/building-and-scaling-data-lineage-at-netflix-to-improve-data-infrastructure-reliability-and-1a52526a7977?source=rss----2615bd06b42e---4

Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and Efficiency

By: Di Lin, Girish Lingappa, Jitender Aswani

Imagine yourself in the role of a data-inspired decision maker staring at a metric on a dashboard about to make a critical business decision but pausing to ask a question — “Can I run a check myself to understand what data is behind this metric?”

Now, imagine yourself in the role of a software engineer responsible for a micro-service which publishes data consumed by few critical customer facing services (e.g. billing). You are about to make structural changes to the data and want to know who and what downstream to your service will be impacted.

Finally, imagine yourself in the role of a data platform reliability engineer tasked with providing advanced lead time to data pipeline (ETL) owners by proactively identifying issues upstream to their ETL jobs. You are designing a learning system to forecast Service Level Agreement (SLA) violations and would want to factor in all upstream dependencies and corresponding historical states.

At Netflix, user stories centered on understanding data dependencies shared above and countless more in Detection & Data Cleansing, Retention & Data Efficiency, Data Integrity, Cost Attribution, and Platform Reliability subject areas inspired Data Engineering and Infrastructure (DEI) team to envision a comprehensive data lineage system and embark on a development journey a few years ago. We adopted the following mission statement to guide our investments:

“Provide a complete and accurate data lineage system enabling decision-makers to win moments of truth.”

In the rest of this blog, we will a) touch on the complexity of Netflix cloud landscape, b) discuss lineage design goals, ingestion architecture and the corresponding data model, c) share the challenges we faced and the learnings we picked up along the way, and d) close it out with “what’s next” on this journey.

Netflix Data Landscape

Freedom & Responsibility (F&R) is the lynchpin of Netflix’s culture empowering teams to move fast to deliver on innovation and operate with freedom to satisfy their mission. Central engineering teams provide paved paths (secure, vetted and supported options) and guard rails to help reduce variance in choices available for tools and technologies to support the development of scalable technical architectures. Nonetheless, Netflix data landscape (see below) is complex and many teams collaborate effectively for sharing the responsibility of our data system management. Therefore, building a complete and accurate data lineage system to map out all the data-artifacts (including in-motion and at-rest data repositories, Kafka topics, apps, reports and dashboards, interactive and ad-hoc analysis queries, ML and experimentation models) is a monumental task and requires a scalable architecture, robust design, a strong engineering team and above all, amazing cross-functional collaboration.

Data Landscape

Design Goals

At the project inception stage, we defined a set of design goals to help guide the architecture and development work for data lineage to deliver a complete, accurate, reliable and scalable lineage system mapping Netflix’s diverse data landscape. Let’s review a few of these principles:

  • Ensure data integrity — Accurately capture the relationship in data from disparate data sources to establish trust with users because without absolute trust lineage data may do more harm than good.
  • Enable seamless integration — Design the system to integrate with a growing list of data tools and platforms including the ones that do not have the built-in meta-data instrumentation to derive data lineage from.
  • Design a flexible data model— Represent a wide range of data artifacts and relationships among them using a generic data model to enable a wide variety of business use cases.

Ingestion-at-scale

The data movement at Netflix does not necessarily follow a single paved path since engineers have the freedom to choose (and the responsibility to manage) the best available data tools and platforms to achieve their business goals. As a result, a single consolidated and centralized source of truth does not exist that can be leveraged to derive data lineage truth. Therefore, the ingestion approach for data lineage is designed to work with many disparate data sources.

Our data ingestion approach, in a nutshell, is classified broadly into two buckets — push or pull. Today, we are operating using a pull-heavy model. In this model, we scan system logs and metadata generated by various compute engines to collect corresponding lineage data. For example, we leverage inviso to list pig jobs and then lipstick to fetch tables and columns from these pig scripts. For spark compute engine, we leverage spark plan information and for Snowflake, admin tables capture the same information. In addition, we derive lineage information from scheduled ETL jobs by extracting workflow definitions and runtime metadata using Meson scheduler APIs.

In the push model paradigm, various platform tools such as the data transportation layer, reporting tools, and Presto will publish lineage events to a set of lineage related Kafka topics, therefore, making data ingestion relatively easy to scale improving scalability for the data lineage system.

Data Enrichment

The lineage data, when enriched with entity metadata and associated relationships, become more valuable to deliver on a rich set of business cases. We leverage Metacat data, our internal metadata store and service, to enrich lineage data with additional table metadata. We also leverage metadata from another internal tool, Genie, internal job and resource manager, to add job metadata (such as job owner, cluster, scheduler metadata) on lineage data. The ingestions (ETL) pipelines transform enriched datasets to a common data model (design based on a graph structure stored as vertices and edges) to serve lineage use cases. The lineage data along with the enriched information is accessed through many interfaces using SQL against the warehouse and Gremlin and a REST Lineage Service against a graph database populated from the lineage data discussed earlier in this paragraph.

Challenges

We faced a diverse set of challenges spread across many layers in the system. Netflix’s diverse data landscape made it challenging to capture all the right data and conforming it to a common data model. In addition, the ingestion layer designed to address several ingestions patterns added to operational complexity. Spark is the primary big-data compute engine at Netflix and with pretty much every upgrade in Spark, the spark plan changed as well springing continuous and unexpected surprises for us.

We defined a generic data model to store lineage information and now conforming the entity and associated relationships from various data sources to this data model. We are loading the lineage data to a graph database to enable seamless integration with a REST data lineage service to address business use cases. To improve data accuracy, we decided to leverage AWS S3 access logs to identify entity relationships not been captured by our traditional ingestion process.

We are continuing to address the ingestion challenges by adopting a system level instrumentation approach for spark, other compute engines, and data transport tools. We are designing a CRUD layer and exposing it as REST APIs to make it easier for anyone to publish lineage data to our pipelines.

We are taking a mature and comprehensive data lineage system and now extending its coverage far beyond traditional data warehouse realms with a goal to build universal data lineage to represent all the data artifacts and corresponding relationships. We are tackling a bunch of very interesting known unknowns with exciting initiatives in the field of data catalog and asset inventory. Mapping micro-services interactions, entities from real time infrastructure, and ML infrastructure and other non traditional data stores are few such examples.

Lineage Architecture and Data Model

Data Flow

As illustrated in the diagram above, various systems have their own independent data ingestion process in place leading to many different data models that store entities and relationships data at varying granularities. This data needed to be stitched together to accurately and comprehensively describe the Netflix data landscape and required a set of conformance processes before delivering the data for a wider audience.

During the conformance process, the data collected from different sources is transformed to make sure that all entities in our data flow, such as tables, jobs, reports, etc. are described in a consistent format, and stored in a generic data model for further usage.

Based on a standard data model at the entity level, we built a generic relationship model that describes the dependencies between any pair of entities. Using this approach, we are able to build a unified data model and the repository to deliver the right leverage to enable multiple use cases such as data discovery, SLA service and Data Efficiency.

Current Use Cases

Big Data Portal, a visual interface to data management at Netflix, has been the primary consumer of lineage data thus far. Many features benefit from lineage data including ranking of search results, table column usage for downstream jobs, deriving upstream dependencies in workflows, and building visibility of jobs writing to downstream tables.

Our most recent focus has been on powering (a) a data lineage service (REST based) leveraged by SLA service and (b) the data efficiency (to support data lifecycle management) use cases. SLA service relies on the job dependencies defined in ETL workflows to alert on potential SLA misses. This service also proactively alerts on any potential delays in few critical reports due to any job delays or failures anywhere upstream to it.

The data efficiency use cases leverages visibility on entities and their relationships to drive cost and attribution gains, auto cleansing of unused data in the warehouse .

What’s next?

Our journey on extending the value of lineage data to new frontiers has just begun and we have a long way to go in achieving the overarching goal of providing universal data lineage representing all entities and corresponding relationships for all data at Netflix. In the short to medium term, we are planning to onboard more data platforms and leverage graph database and a lineage REST service and GraphQL interface to enable more business use cases including improving developer productivity. We also plan to increase our investment in data detection initiatives by integrating lineage data and detection tools to efficiently scan our data to further improve data hygiene.

Please share your experience by adding your comments below and stay tuned for more on data lineage at Netflix in the follow up blog posts. .

Credits: We want to extend our sincere thanks to many esteemed colleagues from the data platform (part of Data Engineering and Infrastructure org) team at Netflix who pioneered this topic before us and who continue to extend our thinking on this topic with their valuable insights and are building many useful services on lineage data.

We will be at Strata San Francisco on March 27th in room 2001 delivering a tech session on this topic, please join us and share your experiences.

If you would like to be part of our small, impactful, and collaborative team — come join us.


Building and Scaling Data Lineage at Netflix to Improve Data Infrastructure Reliability, and… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Bullet Updates – Windowing, Apache Pulsar PubSub, Configuration-based Data Ingestion, and More

Post Syndicated from rosaliebeevm original https://yahooeng.tumblr.com/post/183315480351

yahoodevelopers:

By Akshay Sarma, Principal Engineer, Verizon Media & Brian Xiao, Software Engineer, Verizon Media

This is the first of an ongoing series of blog posts sharing releases and announcements for Bullet, an open-sourced lightweight, scalable, pluggable, multi-tenant query system.

Bullet allows you to query any data flowing through a streaming system without having to store it first through its UI or API. The queries are injected into the running system and have minimal overhead. Running hundreds of queries generally fit into the overhead of just reading the streaming data. Bullet requires running an instance of its backend on your data. This backend runs on common stream processing frameworks (Storm and Spark Streaming currently supported).

The data on which Bullet sits determines what it is used for. For example, our team runs an instance of Bullet on user engagement data (~1M events/sec) to let developers find their own events to validate their code that produces this data. We also use this instance to interactively explore data, throw up quick dashboards to monitor live releases, count unique users, debug issues, and more.

Since open sourcing Bullet in 2017, we’ve been hard at work adding many new features! We’ll highlight some of these here and continue sharing update posts for future releases.

Windowing

Bullet used to operate in a request-response fashion – you would submit a query and wait for the query to meet its termination conditions (usually duration) before receiving results. For short-lived queries, say, a few seconds, this was fine. But as we started fielding more interactive and iterative queries, waiting even a minute for results became too cumbersome.

Enter windowing! Bullet now supports time and record-based windowing. With time windowing, you can break up your query into chunks of time over its duration and retrieve results for each chunk.  For example, you can calculate the average of a field, and stream back results every second:

In the above example, the aggregation is operating on all the data since the beginning of the query, but you can also do aggregations on just the windows themselves. This is often called a Tumbling window:

image

With record windowing, you can get the intermediate aggregation for each record that matches your query (a Sliding window). Or you can do a Tumbling window on records rather than time. For example, you could get results back every three records:

image

Overlapping windows in other ways (Hopping windows) or windows that reset based on different criteria (Session windows, Cascading windows) are currently being worked on. Stay tuned!

image
image

Apache Pulsar support as a native PubSub

Bullet uses a PubSub (publish-subscribe) message queue to send queries and results between the Web Service and Backend. As with everything else in Bullet, the PubSub is pluggable. You can use your favorite pubsub by implementing a few interfaces if you don’t want to use the ones we provide. Until now, we’ve maintained and supported a REST-based PubSub and an Apache Kafka PubSub. Now we are excited to announce supporting Apache Pulsar as well! Bullet Pulsar will be useful to those users who want to use Pulsar as their underlying messaging service.

If you aren’t familiar with Pulsar, setting up a local standalone is very simple, and by default, any Pulsar topics written to will automatically be created. Setting up an instance of Bullet with Pulsar instead of REST or Kafka is just as easy. You can refer to our documentation for more details.

image

Plug your data into Bullet without code

While Bullet worked on any data source located in any persistence layer, you still had to implement an interface to connect your data source to the Backend and convert it into a record container format that Bullet understands. For instance, your data might be located in Kafka and be in the Avro format. If you were using Bullet on Storm, you would perhaps write a Storm Spout to read from Kafka, deserialize, and convert the Avro data into the Bullet record format. This was the only interface in Bullet that required our customers to write their own code. Not anymore! Bullet DSL is a text/configuration-based format for users to plug in their data to the Bullet Backend without having to write a single line of code.

Bullet DSL abstracts away the two major components for plugging data into the Bullet Backend. A Connector piece to read from arbitrary data-sources and a Converter piece to convert that read data into the Bullet record container. We currently support and maintain a few of these – Kafka and Pulsar for Connectors and Avro, Maps and arbitrary Java POJOs for Converters. The Converters understand typed data and can even do a bit of minor ETL (Extract, Transform and Load) if you need to change your data around before feeding it into Bullet. As always, the DSL components are pluggable and you can write your own (and contribute it back!) if you need one that we don’t support.

We appreciate your feedback and contributions! Explore Bullet on GitHub, use and help contribute to the project, and chat with us on Google Groups. To get started, try our Quickstarts on Spark or Storm to set up an instance of Bullet on some fake data and play around with it.

How we simplified our Data Ingestion & Transformation Process

Post Syndicated from Grab Tech original https://engineering.grab.com/data-ingestion-transformation-product-insights

Introduction

As Grab grew from a small startup to an organisation serving millions of customers and driver partners, making day-to-day data-driven decisions became paramount. We needed a system to efficiently ingest data from mobile apps and backend systems and then make it available for analytics and engineering teams.

Thanks to modern data processing frameworks, ingesting data isn’t a big issue. However, at Grab scale it is a non-trivial task. We had to prepare for two key scenarios:

  • Business growth, including organic growth over time and expected seasonality effects.
  • Any unexpected peaks due to unforeseen circumstances. Our systems have to be horizontally scalable.

We could ingest data in batches, in real time, or a combination of the two. When you ingest data in batches, you can import it at regularly scheduled intervals or when it reaches a certain size. This is very useful when processes run on a schedule, such as reports that run daily at a specific time. Typically, batched data is useful for offline analytics and data science.

On the other hand, real-time ingestion has significant business value, such as with reactive systems. For example, when a customer provides feedback for a Grab superapp widget, we re-rank widgets based on that customer’s likes or dislikes. Note when information is very time-sensitive, you must continuously monitor its data.

This blog post describes how Grab built a scalable data ingestion system and how we went from prototyping with Spark Streaming to running a production-grade data processing cluster written in Golang.

Building the system without reinventing the wheel

The data ingestion system:

  1. Collects raw data as app events.
  2. Transforms the data into a structured format.
  3. Stores the data for analysis and monitoring.

In a previous blog post, we discussed dealing with batched data ETL with Spark. This post focuses on real-time ingestion.

We separated the data ingestion system into 3 layers: collection, transformation, and storage. This table and diagram highlights the tools used in each layer in our system’s first design.

Layer Tools
Collection Gateway, Kafka
Transformation Go processing service, Spark Streaming
Storage TalariaDB

Our first design might seem complex, but we used battle-tested and common tools such as Apache Kafka and Spark Streaming. This let us get an end-to-end solution up and running quickly.

Collection layer

Our collection layer had two sub-layers:

  1. Our custom built API Gateway received HTTP requests from the mobile app. It simply decoded and authenticated HTTP requests, streaming the data to the Kafka queue.
  2. The Kafka queue decoupled the transformation layer (shown in the above figure as the processing service and Spark streaming) from the collection layer (shown above as the Gateway service). We needed to retain raw data in the Kafka queue for fault tolerance of the entire system. Imagine an error where a data pipeline pollutes the data with flawed transformation code or just simply crashes. The Kafka queue saves us from data loss by data backfilling.

Since it’s robust and battle-tested, we chose Kafka as our queueing solution. It perfectly met our requirements, such as high throughput and low latency. Although Kafka takes some operational effort such as self-hosting and monitoring, Grab has a proficient and dedicated team managing our Kafka cluster.

Transformation layer

There are many options for real-time data processing, including Spark Streaming, Flink, and Storm. Since we use Spark for all our batch processing, we decided to use Spark Streaming.

We deployed a Golang processing service between Kafka and Spark Streaming. This service converts the data from Protobuf to Avro. Instead of pointing Spark Streaming directly to Kafka, we used this processing service as an intermediary. This was because our Spark Streaming job was written in Python and Spark doesn’t natively support protobuf decoding.  We used Avro format, since Grab historically used it for archiving streaming data. Each raw event was enriched and batched together with other events. Batches were then uploaded to S3.

Storage layer

TalariaDB is a Grab-built time-series database. It ingests events as columnar ORC files, indexing them by event name and time. We use the same ORC format files for batch processing. TalariaDB also implements the Presto Thrift connector interface, so our users could query certain event types by time range. They did this by connecting a Presto to a TalariaDB hosting distributed cluster.

Problems

Building and deploying our data pipeline’s MVP provided great value to our data analysts, engineers, and QA team. For example, our mobile app team could monitor any abnormal change in the real-time metrics, such as the screen load time for the latest released app version. The QA team could perform app side actions (book a ride, make payment, etc.) and check which events were triggered and received by the backend. The latency between the ingestion and the serving layer was only 4 minutes instead of the batch processing system’s 60 minutes. The streaming processing’s data showed good business value.

This prompted us to develop more features on top of our platform-collected real-time data. Very soon our QA engineers and the product analytics team used more and more of the real-time data processing system. They started instrumenting various mobile applications so more data started flowing in. However, as our ingested data increased, so did our problems. These were mostly related to operational complexity and the increased latency.

Operational complexity

Only a few team members could operate Spark Streaming and EMR. With more data and variable rates, our streaming jobs had scaling issues and failed occasionally. This was due to checkpoint issues when the cluster was under heavy load. Increasing the cluster size helped, but adding more nodes also increased the likelihood of losing more cluster nodes. When we lost nodes,our latency went up and added more work for our already busy on-call engineers.

Supporting native Protobuf

To simplify the architecture, we initially planned to bypass our Golang-written processing service for the real-time data pipeline. Our plan was to let Spark directly talk to the Kafka queue and send the output to S3. This required packaging the decoders for our protobuf messages for Python Spark jobs, which was cumbersome. We thought about rewriting our job in Scala, but we didn’t have enough experience with it.

Also, we’d soon hit some streaming limits from S3. Our Spark streaming job was consuming objects from S3, but the process was not continuous due to S3’s  eventual consistency. To avoid long pagination queries in the S3 API, we had to prefix the data with the hour in which it was ingested. This resulted in some data loss after processing by the Spark streaming. The loss happened because the new data would appear in S3 while Spark Streaming had already moved on to the next hour. We tried various tweaks, but it was just a bad design. As our data grew to over one terabyte per hour, our data loss grew with it.

Processing lag

On average, the time from our system ingesting an event to when it was available on the Presto was 4 to 6 minutes. We call that processing lag, as it happened due to our data processing. It was substantially worse under heavy loads, increasing to 8 to 13 minutes. While that wasn’t bad at this scale (a few TBs of data), it made some use cases impossible, such as monitoring. We needed to do better.

Simplifying the architecture and rewriting in Golang

After completing the MVP phase development, we noticed the Spark Streaming functionality we actually used was relatively trivial. In the Spark Streaming job, we only:

  • Partitioned the batch of events by event name.
  • Encoded the data in ORC format.
  • And uploaded to an S3 bucket.

To mitigate the problems mentioned above, we tried re-implementing the features in our existing Golang processing service. Besides consuming the data and publishing to an S3 bucket, the transformation service also needed to deal with event partitioning and ORC encoding.

One key problem we addressed was implementing a robust event partitioner with a large write throughput and low read latency. Fortunately, Golang has a nice concurrent map package. To further reduce the lock contention, we added sharding.

We made the changes, deployed the service to production,and discovered our service was now memory-bound as we buffered data for 1 minute. We did thorough benchmarking and profiling on heap allocation to improve memory utilization. By iteratively reducing inefficiencies and contributing to a lower CPU consumption, we made our data transformation more efficient.

Performance

After revamping the system, the elapsed time for a single event to travel from the gateway to our dashboard is about 1 minute. We also fixed the data loss issue. Finally, we significantly reduced our on-call workload by removing Spark Streaming.

Validation

At this point, we had both our old and new pipelines running in parallel. After drastically improving our performance, we needed to confirm we still got the same end results. This was done by running a query against each of the pipelines and comparing the results. Both systems were registered to the same Presto cluster.

We ran two SQL “excerpts” between the two pipelines in different order. Both queries returned the same events, validating our new pipeline’s correctness.

select count(1) from ((
 select uuid, time from grab_x.realtime_new
 where event = 'app.metric1' and time between 1541734140 and 1541734200
) except (
 select uuid, time from grab_x.realtime_old
 where event = 'app.metric1' and time between 1541734140 and 1541734200
))

/* output: 0 */

Conclusions

Scaling a data ingestion system to handle hundreds of thousands of events per second was a non-trivial task. However, by iterating and constantly simplifying our overall architecture, we were able to efficiently ingest the data and drive down its lag to around one minute.

Spark Streaming was a great tool and gave us time to understand the problem. But, understanding what we actually needed to build and iteratively optimise the entire data pipeline led us to:

  • Replacing Spark Streaming with our new Golang-implemented pipeline.
  • Removing Avro encoding.
  • Removing an intermediary S3 step.

Differences between the old and new pipelines are:

Old Pipeline New Pipeline
Languages Python, Go Go
Stages 4 services 3 services
Conversions Protobuf → Avro → ORC Protobuf → ORC
Lag 4-13 min 1 min

Systems usually become more and more complex over time, leading to tech debt and decreased performance. In our case, starting with more steps in the data pipeline was actually the simple solution, since we could re-use existing tools. But as we reduced processing stages, we’ve also seen fewer failures. By simplifying the problem, we improved performance and decreased operational complexity. At the end of the day, our data pipeline solves exactly our problem and does nothing else, keeping things fast.

AWS Online Tech Talks – June 2018

Post Syndicated from Devin Watson original https://aws.amazon.com/blogs/aws/aws-online-tech-talks-june-2018/

AWS Online Tech Talks – June 2018

Join us this month to learn about AWS services and solutions. New this month, we have a fireside chat with the GM of Amazon WorkSpaces and our 2nd episode of the “How to re:Invent” series. We’ll also cover best practices, deep dives, use cases and more! Join us and register today!

Note – All sessions are free and in Pacific Time.

Tech talks featured this month:

 

Analytics & Big Data

June 18, 2018 | 11:00 AM – 11:45 AM PTGet Started with Real-Time Streaming Data in Under 5 Minutes – Learn how to use Amazon Kinesis to capture, store, and analyze streaming data in real-time including IoT device data, VPC flow logs, and clickstream data.
June 20, 2018 | 11:00 AM – 11:45 AM PT – Insights For Everyone – Deploying Data across your Organization – Learn how to deploy data at scale using AWS Analytics and QuickSight’s new reader role and usage based pricing.

 

AWS re:Invent
June 13, 2018 | 05:00 PM – 05:30 PM PTEpisode 2: AWS re:Invent Breakout Content Secret Sauce – Hear from one of our own AWS content experts as we dive deep into the re:Invent content strategy and how we maintain a high bar.
Compute

June 25, 2018 | 01:00 PM – 01:45 PM PTAccelerating Containerized Workloads with Amazon EC2 Spot Instances – Learn how to efficiently deploy containerized workloads and easily manage clusters at any scale at a fraction of the cost with Spot Instances.

June 26, 2018 | 01:00 PM – 01:45 PM PTEnsuring Your Windows Server Workloads Are Well-Architected – Get the benefits, best practices and tools on running your Microsoft Workloads on AWS leveraging a well-architected approach.

 

Containers
June 25, 2018 | 09:00 AM – 09:45 AM PTRunning Kubernetes on AWS – Learn about the basics of running Kubernetes on AWS including how setup masters, networking, security, and add auto-scaling to your cluster.

 

Databases

June 18, 2018 | 01:00 PM – 01:45 PM PTOracle to Amazon Aurora Migration, Step by Step – Learn how to migrate your Oracle database to Amazon Aurora.
DevOps

June 20, 2018 | 09:00 AM – 09:45 AM PTSet Up a CI/CD Pipeline for Deploying Containers Using the AWS Developer Tools – Learn how to set up a CI/CD pipeline for deploying containers using the AWS Developer Tools.

 

Enterprise & Hybrid
June 18, 2018 | 09:00 AM – 09:45 AM PTDe-risking Enterprise Migration with AWS Managed Services – Learn how enterprise customers are de-risking cloud adoption with AWS Managed Services.

June 19, 2018 | 11:00 AM – 11:45 AM PTLaunch AWS Faster using Automated Landing Zones – Learn how the AWS Landing Zone can automate the set up of best practice baselines when setting up new

 

AWS Environments

June 21, 2018 | 11:00 AM – 11:45 AM PTLeading Your Team Through a Cloud Transformation – Learn how you can help lead your organization through a cloud transformation.

June 21, 2018 | 01:00 PM – 01:45 PM PTEnabling New Retail Customer Experiences with Big Data – Learn how AWS can help retailers realize actual value from their big data and deliver on differentiated retail customer experiences.

June 28, 2018 | 01:00 PM – 01:45 PM PTFireside Chat: End User Collaboration on AWS – Learn how End User Compute services can help you deliver access to desktops and applications anywhere, anytime, using any device.
IoT

June 27, 2018 | 11:00 AM – 11:45 AM PTAWS IoT in the Connected Home – Learn how to use AWS IoT to build innovative Connected Home products.

 

Machine Learning

June 19, 2018 | 09:00 AM – 09:45 AM PTIntegrating Amazon SageMaker into your Enterprise – Learn how to integrate Amazon SageMaker and other AWS Services within an Enterprise environment.

June 21, 2018 | 09:00 AM – 09:45 AM PTBuilding Text Analytics Applications on AWS using Amazon Comprehend – Learn how you can unlock the value of your unstructured data with NLP-based text analytics.

 

Management Tools

June 20, 2018 | 01:00 PM – 01:45 PM PTOptimizing Application Performance and Costs with Auto Scaling – Learn how selecting the right scaling option can help optimize application performance and costs.

 

Mobile
June 25, 2018 | 11:00 AM – 11:45 AM PTDrive User Engagement with Amazon Pinpoint – Learn how Amazon Pinpoint simplifies and streamlines effective user engagement.

 

Security, Identity & Compliance

June 26, 2018 | 09:00 AM – 09:45 AM PTUnderstanding AWS Secrets Manager – Learn how AWS Secrets Manager helps you rotate and manage access to secrets centrally.
June 28, 2018 | 09:00 AM – 09:45 AM PTUsing Amazon Inspector to Discover Potential Security Issues – See how Amazon Inspector can be used to discover security issues of your instances.

 

Serverless

June 19, 2018 | 01:00 PM – 01:45 PM PTProductionize Serverless Application Building and Deployments with AWS SAM – Learn expert tips and techniques for building and deploying serverless applications at scale with AWS SAM.

 

Storage

June 26, 2018 | 11:00 AM – 11:45 AM PTDeep Dive: Hybrid Cloud Storage with AWS Storage Gateway – Learn how you can reduce your on-premises infrastructure by using the AWS Storage Gateway to connecting your applications to the scalable and reliable AWS storage services.
June 27, 2018 | 01:00 PM – 01:45 PM PTChanging the Game: Extending Compute Capabilities to the Edge – Discover how to change the game for IIoT and edge analytics applications with AWS Snowball Edge plus enhanced Compute instances.
June 28, 2018 | 11:00 AM – 11:45 AM PTBig Data and Analytics Workloads on Amazon EFS – Get best practices and deployment advice for running big data and analytics workloads on Amazon EFS.

New – Pay-per-Session Pricing for Amazon QuickSight, Another Region, and Lots More

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/new-pay-per-session-pricing-for-amazon-quicksight-another-region-and-lots-more/

Amazon QuickSight is a fully managed cloud business intelligence system that gives you Fast & Easy to Use Business Analytics for Big Data. QuickSight makes business analytics available to organizations of all shapes and sizes, with the ability to access data that is stored in your Amazon Redshift data warehouse, your Amazon Relational Database Service (RDS) relational databases, flat files in S3, and (via connectors) data stored in on-premises MySQL, PostgreSQL, and SQL Server databases. QuickSight scales to accommodate tens, hundreds, or thousands of users per organization.

Today we are launching a new, session-based pricing option for QuickSight, along with additional region support and other important new features. Let’s take a look at each one:

Pay-per-Session Pricing
Our customers are making great use of QuickSight and take full advantage of the power it gives them to connect to data sources, create reports, and and explore visualizations.

However, not everyone in an organization needs or wants such powerful authoring capabilities. Having access to curated data in dashboards and being able to interact with the data by drilling down, filtering, or slicing-and-dicing is more than adequate for their needs. Subscribing them to a monthly or annual plan can be seen as an unwarranted expense, so a lot of such casual users end up not having access to interactive data or BI.

In order to allow customers to provide all of their users with interactive dashboards and reports, the Enterprise Edition of Amazon QuickSight now allows Reader access to dashboards on a Pay-per-Session basis. QuickSight users are now classified as Admins, Authors, or Readers, with distinct capabilities and prices:

Authors have access to the full power of QuickSight; they can establish database connections, upload new data, create ad hoc visualizations, and publish dashboards, all for $9 per month (Standard Edition) or $18 per month (Enterprise Edition).

Readers can view dashboards, slice and dice data using drill downs, filters and on-screen controls, and download data in CSV format, all within the secure QuickSight environment. Readers pay $0.30 for 30 minutes of access, with a monthly maximum of $5 per reader.

Admins have all authoring capabilities, and can manage users and purchase SPICE capacity in the account. The QuickSight admin now has the ability to set the desired option (Author or Reader) when they invite members of their organization to use QuickSight. They can extend Reader invites to their entire user base without incurring any up-front or monthly costs, paying only for the actual usage.

To learn more, visit the QuickSight Pricing page.

A New Region
QuickSight is now available in the Asia Pacific (Tokyo) Region:

The UI is in English, with a localized version in the works.

Hourly Data Refresh
Enterprise Edition SPICE data sets can now be set to refresh as frequently as every hour. In the past, each data set could be refreshed up to 5 times a day. To learn more, read Refreshing Imported Data.

Access to Data in Private VPCs
This feature was launched in preview form late last year, and is now available in production form to users of the Enterprise Edition. As I noted at the time, you can use it to implement secure, private communication with data sources that do not have public connectivity, including on-premises data in Teradata or SQL Server, accessed over an AWS Direct Connect link. To learn more, read Working with AWS VPC.

Parameters with On-Screen Controls
QuickSight dashboards can now include parameters that are set using on-screen dropdown, text box, numeric slider or date picker controls. The default value for each parameter can be set based on the user name (QuickSight calls this a dynamic default). You could, for example, set an appropriate default based on each user’s office location, department, or sales territory. Here’s an example:

To learn more, read about Parameters in QuickSight.

URL Actions for Linked Dashboards
You can now connect your QuickSight dashboards to external applications by defining URL actions on visuals. The actions can include parameters, and become available in the Details menu for the visual. URL actions are defined like this:

You can use this feature to link QuickSight dashboards to third party applications (e.g. Salesforce) or to your own internal applications. Read Custom URL Actions to learn how to use this feature.

Dashboard Sharing
You can now share QuickSight dashboards across every user in an account.

Larger SPICE Tables
The per-data set limit for SPICE tables has been raised from 10 GB to 25 GB.

Upgrade to Enterprise Edition
The QuickSight administrator can now upgrade an account from Standard Edition to Enterprise Edition with a click. This enables provisioning of Readers with pay-per-session pricing, private VPC access, row-level security for dashboards and data sets, and hourly refresh of data sets. Enterprise Edition pricing applies after the upgrade.

Available Now
Everything I listed above is available now and you can start using it today!

You can try QuickSight for 60 days at no charge, and you can also attend our June 20th Webinar.

Jeff;

 

Connect, collaborate, and learn at AWS Global Summits in 2018

Post Syndicated from Tina Kelleher original https://aws.amazon.com/blogs/big-data/connect-collaborate-and-learn-at-aws-global-summits-in-2018/

Regardless of your career path, there’s no denying that attending industry events can provide helpful career development opportunities — not only for improving and expanding your skill sets, but for networking as well. According to this article from PayScale.com, experts estimate that somewhere between 70-85% of new positions are landed through networking.

Narrowing our focus to networking opportunities with cloud computing professionals who’re working on tackling some of today’s most innovative and exciting big data solutions, attending big data-focused sessions at an AWS Global Summit is a great place to start.

AWS Global Summits are free events that bring the cloud computing community together to connect, collaborate, and learn about AWS. As the name suggests, these summits are held in major cities around the world, and attract technologists from all industries and skill levels who’re interested in hearing from AWS leaders, experts, partners, and customers.

In addition to networking opportunities with top cloud technology providers, consultants and your peers in our Partner and Solutions Expo, you’ll also hone your AWS skills by attending and participating in a multitude of education and training opportunities.

Here’s a brief sampling of some of the upcoming sessions relevant to big data professionals:

May 31st : Big Data Architectural Patterns and Best Practices on AWS | AWS Summit – Mexico City

June 6th-7th: Various (click on the “Big Data & Analytics” header) | AWS Summit – Berlin

June 20-21st : [email protected] | Public Sector Summit – Washington DC

June 21st: Enabling Self Service for Data Scientists with AWS Service Catalog | AWS Summit – Sao Paulo

Be sure to check out the main page for AWS Global Summits, where you can see which cities have AWS Summits planned for 2018, register to attend an upcoming event, or provide your information to be notified when registration opens for a future event.

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.