We all know that our cell phones constantly give our location away to our mobile network operators; that’s how they work. A group of researchers has figured out a way to fix that. “Pretty Good Phone Privacy” (PGPP) protects both user identity and user location using the existing cellular networks. It protects users from fake cell phone towers (IMSI-catchers) and surveillance by cell providers.
It’s a clever system. The players are the user, a traditional mobile network operator (MNO) like AT&T or Verizon, and a new mobile virtual network operator (MVNO). MVNOs aren’t new. They’re intermediaries like Cricket and Boost.
Here’s how it works:
One-time setup: The user’s phone gets a new SIM from the MVNO. All MVNO SIMs are identical.
Monthly: The user pays their bill to the MVNO (credit card or otherwise) and the phone gets anonymous authentication (using Chaum blind signatures) tokens for each time slice (e.g., hour) in the coming month.
Ongoing: When the phone talks to a tower (run by the MNO), it sends a token for the current time slice. This is relayed to a MVNO backend server, which checks the Chaum blind signature of the token. If it’s valid, the MVNO tells the MNO that the user is authenticated, and the user receives a temporary random ID and an IP address. (Again, this is now MVNOs like Boost already work.)
On demand: The user uses the phone normally.
The MNO doesn’t have to modify its system in any way. The PGPP MVNO implementation is in software. The user’s traffic is sent to the MVNO gateway and then out onto the Internet, potentially even using a VPN.
All connectivity is data connectivity in cell networks today. The user can choose to be data-only (e.g., use Signal for voice), or use the MVNO or a third party for VoIP service that will look just like normal telephony.
The group prototyped and tested everything with real phones in the lab. Their approach adds essentially zero latency, and doesn’t introduce any new bottlenecks, so it doesn’t have performance/scalability problems like most anonymity networks. The service could handle tens of millions of users on a single server, because it only has to do infrequent authentication, though for resilience you’d probably run more.
Security researcher Ahmed Hassan has shown that spoofing the Android’s “People Nearby” feature allows him to pinpoint the physical location of Telegram users:
Using readily available software and a rooted Android device, he’s able to spoof the location his device reports to Telegram servers. By using just three different locations and measuring the corresponding distance reported by People Nearby, he is able to pinpoint a user’s precise location.
[…]
A proof-of-concept video the researcher sent to Telegram showed how he could discern the address of a People Nearby user when he used a free GPS spoofing app to make his phone report just three different locations. He then drew a circle around each of the three locations with a radius of the distance reported by Telegram. The user’s precise location was where all three intersected.
[…]
Fixing the problem — or at least making it much harder to exploit it — wouldn’t be hard from a technical perspective. Rounding locations to the nearest mile and adding some random bits generally suffices. When the Tinder app had a similar disclosure vulnerability, developers used this kind of technique to fix it.
Abstract: Executive Order 12,333 (“EO 12333”) is a 1980s Executive Order signed by President Ronald Reagan that, among other things, establishes an overarching policy framework for the Executive Branch’s spying powers. Although electronic surveillance programs authorized by EO 12333 generally target foreign intelligence from foreign targets, its permissive targeting standards allow for the substantial collection of Americans’ communications containing little to no foreign intelligence value. This fact alone necessitates closer inspection.
This working draft conducts such an inspection by collecting and coalescing the various declassifications, disclosures, legislative investigations, and news reports concerning EO 12333 electronic surveillance programs in order to provide a better understanding of how the Executive Branch implements the order and the surveillance programs it authorizes. The Article pays particular attention to EO 12333’s designation of the National Security Agency as primarily responsible for conducting signals intelligence, which includes the installation of malware, the analysis of internet traffic traversing the telecommunications backbone, the hacking of U.S.-based companies like Yahoo and Google, and the analysis of Americans’ communications, contact lists, text messages, geolocation data, and other information.
After exploring the electronic surveillance programs authorized by EO 12333, this Article proposes reforms to the existing policy framework, including narrowing the aperture of authorized surveillance, increasing privacy standards for the retention of data, and requiring greater transparency and accountability.
The Wall Street Journal has an article about a company called Anomaly Six LLC that has an SDK that’s used by “more than 500 mobile applications.” Through that SDK, the company collects location data from users, which it then sells.
Anomaly Six is a federal contractor that provides global-location-data products to branches of the U.S. government and private-sector clients. The company told The Wall Street Journal it restricts the sale of U.S. mobile phone movement data only to nongovernmental, private-sector clients.
[…]
Anomaly Six was founded by defense-contracting veterans who worked closely with government agencies for most of their careers and built a company to cater in part to national-security agencies, according to court records and interviews.
Just one of the many Internet companies spying on our every move for profit. And I’m sure they sell to the US government; it’s legal and why would they forgo those sales?
The NSA has issued an advisory on the risks of location data.
Mitigations reduce, but do not eliminate, location tracking risks in mobile devices. Most users rely on features disabled by such mitigations, making such safeguards impractical. Users should be aware of these risks and take action based on their specific situation and risk tolerance. When location exposure could be detrimental to a mission, users should prioritize mission risk and apply location tracking mitigations to the greatest extent possible. While the guidance in this document may be useful to a wide range of users, it is intended primarily for NSS/DoD system users.
The document provides a list of mitigation strategies, including turning things off:
If it is critical that location is not revealed for a particular mission, consider the following recommendations:
Determine a non-sensitive location where devices with wireless capabilities can be secured prior to the start of any activities. Ensure that the mission site cannot be predicted from this location.
Leave all devices with any wireless capabilities (including personal devices) at this non-sensitive location. Turning off the device may not be sufficient if a device has been compromised.
For mission transportation, use vehicles without built-in wireless communication capabilities, or turn off the capabilities, if possible.
Of course, turning off your wireless devices is itself a signal that something is going on. It’s hard to be clandestine in our always connected world.
Google reportedly has a database called Sensorvault in which it stores location data for millions of devices going back almost a decade.
The article is about geofence warrants, where the police go to companies like Google and ask for information about every device in a particular geographic area at a particular time. In 2013, we learned from Edward Snowden that the NSA does this worldwide. Its program is called CO-TRAVELLER. The NSA claims it stopped doing that in 2014 — probably just stopped doing it in the US — but why should it bother when the government can just get the data from Google.
In November, the company Strava released an anonymous data-visualization map showing all the fitness activity by everyone using the app.
Over this weekend, someone realized that it could be used to locate secret military bases: just look for repeated fitness activity in the middle of nowhere.
Many customers use Amazon Kinesis to ingest, analyze, and persist their streaming data. One of the easiest ways to gain real-time insights into your streaming data is to use Kinesis Analytics. It enables you to query the data in your stream or build entire streaming applications using SQL. Customers use Kinesis Analytics for things like filtering, aggregation, and anomaly detection.
Kinesis Analytics now gives you the option to preprocess your data with AWS Lambda. This gives you a great deal of flexibility in defining what data gets analyzed by your Kinesis Analytics application. You can also define how that data is structured before it is queried by your SQL.
In this post, I discuss some common use cases for preprocessing, and walk you through an example to help highlight its applicability.
Common use cases
There are many reasons why you might choose to preprocess data before starting your analysis. Because you build your preprocessing logic with Lambda, your preprocessor can do anything supported by Lambda. However, there are some specific use cases that lend themselves well to preprocessing, such as data enrichment and data transformation.
Enrichment
In some scenarios, you may need to enhance your streaming data with additional information, before you perform your SQL analysis. Kinesis Analytics gives you the ability to use data from Amazon S3 in your Kinesis Analytics application, using the Reference Data feature. However, you cannot use other data sources from within your SQL query.
To add dynamic data to your streaming data, you can preprocess with a Lambda function, and retrieve the data from the data store of your choosing. For example, consider a scenario where you’re streaming some data about users of your application, including their IP address. You want to do some real-time analysis on the geographic locations of your customers. In this example, your preprocessing Lambda function uses your data source for geolocation information to retrieve the user’s city, state or province, and country, based on the IP address that was included in the streaming record. You then enrich the record with that information and now your SQL query can use those attributes in its aggregation.
Transformation
Because Kinesis Analytics uses SQL to analyze your data, the structure of your streaming records must be mapped to a schema. If your records are JSON or CSV, Kinesis Analytics automatically creates a schema. However, if your JSON records contain complex nested arrays, you may need to customize how the record structure is mapped to a flattened schema. Further, Kinesis Analytics is unable to automatically parse formats such as GZIP, protobuf, or Avro.
If your input records are unstructured text, Kinesis Analytics creates a schema, but it consists of a single column representing your entire record. To remedy these complexities, use Lambda to transform and convert your streaming data so that it more easily maps to a schema that can be queried by the SQL in your Kinesis Analytics application.
Assume that you’re streaming raw Apache access log data from a web fleet to a Kinesis stream, and you want to use Kinesis Analytics to detect anomalies in your HTTP response codes. In this example, you want to detect when your stream contains an unusually large number of 500 response codes. This may indicate that something has gone wrong somewhere in your application, and as a result, Apache is returning 500 responses to clients. This is typically not a good customer experience.
An example Apache access log record looks like this:
231.55.150.184 - - [28/Sep/2017:11:18:59 -0400] "PUT /explore HTTP/1.1" 200 2742 "-" "Mozilla/5.0 (Windows; U; Windows NT 6.3) AppleWebKit/538.0.1 (KHTML, like Gecko) Chrome/20.0.872.0 Safari/538.0.1"
Although its structure is well-defined, it is not JSON or CSV, so it doesn’t map to a defined schema in Kinesis Analytics. To use Kinesis Analytics with raw Apache log records, you can transform them to JSON or CSV with a preprocessing Lambda function. For example, you can convert it to a simple JSON documents that easily maps to a schema:
{
"client_ip": "231.55.150.184",
"request_time": "28/Sep/2017:11:18:59 -0400",
"method": "PUT",
"resource": "/explore",
"protocol": "HTTP/1.1",
"response": 200,
"response_size": 2742,
"user-agent": "Mozilla/5.0 (Windows; U; Windows NT 6.3) AppleWebKit/538.0.1 (KHTML, like Gecko) Chrome/20.0.872.0 Safari/538.0.1"
}
Architecture
To illustrate where the preprocessing step takes place within Kinesis Analytics, take a high-level look at the architecture.
Kinesis Analytics continuously reads data from your Kinesis stream or Kinesis Firehose delivery stream. For each batch of records that it retrieves, the Lambda processor subsystem manages how each batch gets passed to your Lambda function. Your function receives a list of records as input. Within your function, you iterate through the list and apply your business logic to accomplish your preprocessing requirements (such as data transformation).
The input model to your preprocessing function varies slightly, depending on whether the data was received from a stream or delivery stream.
If the source is Kinesis Firehose, the input model is:
{
"invocationId" : Lambda invocation ID (random GUID)
"applicationArn" : Kinesis Analytics application ARN,
"streamArn" : Source delivery stream ARN of the records,
"records": [
{
"recordId" : random GUID,
"kinesisFirehoseRecordMetadata" : {
"approximateArrivalTimestamp" : Approximate time that the delivery stream received the record,
},
"data" : base64 encoded user payload
}
]
}
If the source is Kinesis Streams, the input data contains a few extra attributes specific to streams:
{
"invocationId" : Lambda invocation ID (random GUID)
"applicationArn" : Kinesis Analytics application ARN,
"streamArn" : Source stream ARN of the records,
"records": [
{
"recordId" : random GUID,
"kinesisStreamRecordMetadata" : {
"sequenceNumber" : from the Kinesis Record,
"partitionKey" : from the Kinesis Record,
"shardId" : from the Kinesis Record
"approximateArrivalTimestamp" : from the Kinesis Record
},
"data" : base64 encoded user payload
}
]
}
As you can see, the records attribute is an array of record objects. In your Lambda function, you iterate through each element of the array. Each element’s data attribute contains the base64-encoded record that was retrieved from your input stream or delivery stream. Your function is required to return the modified batch of records using the following model:
{
"records": [
{
"recordId" : record ID that was passed,
"result" : string with value - Ok, Dropped, ProcessingFailed
"data" : processed base64-encoded user payload
}
]
}
For each input record, the output must contain a corresponding record with the same recordId value. The result attribute for each record varies, depending on the logic that you applied in your function. The acceptable values for result are Ok, Dropped, and ProcessingFailed. If the result is Ok, then the value returned in the data attribute continues in the pipeline to the SQL processor. By setting the value of result to Dropped, you can filter out unwanted records.
If your preprocessing function determines that it cannot process a particular record, it can set result to ProcessingFailed, and Kinesis Analytics writes the original record to its error stream. You can access the contents of the error stream to review failed records and take appropriate action.
The Lambda processor in Kinesis Analytics also manages failures and retries of your Lambda function. In cases where your Lambda function returns a result of ProcessingFailed, Kinesis Analytics writes the original input records to its error stream. Therefore, it’s important that you configure a destination for the error stream in your Kinesis Analytics application. By configuring a destination, such as a delivery stream with an S3 destination, all failed records get delivered to a bucket in S3 for later debugging and analysis.
Example Lambda function
Now that you understand some common preprocessing scenarios, and you know where preprocessing fits in the sequence of your Kinesis Analytics application, put it all together into an example Lambda function.
Assume that a data producer is compressing JSON records before sending them to a Kinesis stream or a Kinesis Firehose delivery stream. You want to use Kinesis Analytics to analyze these compressed records. Before you can use SQL to perform the analysis, you must first decompress each input record so that it’s represented as decompressed JSON. This enables it to map to the schema you’ve created in the Kinesis Analytics application. The function below, written in Node.js, receives a list of records, decompresses each, and returns the complete list of decompressed records:
'use strict';
console.log('Loading function');
const zlib = require('zlib');
exports.handler = (event, context, callback) => {
let success = 0; // Number of valid entries found
let failure = 0; // Number of invalid entries found
/* Process the list of records */
const output = event.records.map((record) => {
/* Data is base64-encoded, so decode here */
const compressedData = Buffer.from(record.data, 'base64');
try {
const decompressedData = zlib.unzipSync(compressedData);
/* Encode decompressed JSON or CSV */
const result = (Buffer.from(decompressedData, 'utf8')).toString('base64');
success++;
return {
recordId: record.recordId,
result: 'Ok',
data: result,
};
} catch (err) {
failure++;
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
});
console.log('Processing completed. Successful records ${success}, Failed records ${failure}.');
callback(null, {
records: output,
});
};
Notice how the function sets the result to ProcessingFailed for any record that fails to decompress. In this scenario, Kinesis Analytics writes that failed record to its error stream.
Conclusion
The ability to preprocess your streaming data with Lambda enables you to use Kinesis Analytics to analyze a limitless variety of data. To help you get started, we’ve created several Lambda blueprints in both Python and Node.js. The decompression example used earlier has also been included as a blueprint. The Lambda blueprints can be found in the AWS Lambda Management Console.
Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.
As much as we get addicted to mobile phones and online services, nobody (outside of cyberpunk fiction) actually lives online. That’s why maps, geolocation services, and geographic information systems (GISes) have come to play a bigger role online. They reflect they way we live, work, travel, socialize, and (in the case of natural or human-made disasters, which come more and more frequently) suffer. Thus there is value in integrating geolocation into existing web sites, but systems like WordPress do not make supporting that easy.
The software development firm LuminFire has contributed to the spread of geolocation services by creating a library for WordPress that helps web sites insert geolocation information into web pages. This article describes how LuminFire surmounted the challenges posed by WordPress and shows a few uses for the library.
The digiKam Team has released version 5.6.0 of the digiKam Software Collection for photo management. “With this version the HTML gallery and the video slideshow tools are back, database shrinking (e.g. purging stale thumbnails) is also supported on MySQL, grouping items feature has been improved, the support for custom sidecars type-mime have been added, the geolocation bookmarks introduce fixes to be fully functional with bundles, the support for custom sidecars, and of course a lots of bug has been fixed.”
At this year’s TechCrunch Disrupt NY hackathon, one team presented BackMap, a haptic feedback system which helps visually impaired people to navigate cities and venues. It is assisted by a Raspberry Pi and integrated into a backpack.
Good vibrations with BackMap
The team, including Shashank Sharma, wrote an iOS phone app in Swift, Apple’s open-source programming language. To convert between addresses and geolocations, they used the Esri APIs offered by PubNub. So far, so standard. However, they then configured their BackMap setup so that the user can input their destination via the app, and then follow the route without having to look at a screen or listen to directions. Instead, vibrating motors have been integrated into the straps of a backpack and hooked up to a Raspberry Pi. Whenever the user needs to turn left or right, the Pi makes the respective motor vibrate.
Disrupt NY 2017 Hackathon presentations filmed live on May 15th, 2017. Preceding the Disrupt Conference is Hackathon weekend on May 13-14, where developers and engineers descend from all over the world to take part in a 24-hour hacking endurance test.
BackMap can also be adapted for indoor navigation by receiving signals from beacons. This could be used to direct users to toilet facilities or exhibition booths at conferences. The team hopes to upgrade the BackMap device to use a wristband format in the future.
Accessible Pi
Here at Pi Towers, we are always glad to see Pi builds for people with disabilities: we’ve seen Sanskriti and Aman’s Braille teacher Mudra, the audio e-reader Valdema by Finnish non-profit Kolibre, and Myrijam and Paul’s award-winning, eye-movement-controlled wheelchair, to name but a few.
Our mission is to bring the power of coding and digital making to everyone, and we are lucky to be part of a diverse community of makers and educators who have often worked proactively to make events and resources accessible to as many people as possible. There is, for example, the autism- and Tourette’s syndrome-friendly South London Raspberry Jam, organised by Femi Owolade-Coombes and his mum Grace. The Raspberry VI website is a portal to all things Pi for visually impaired and blind people. Deaf digital makers may find Jim Roberts’ video tutorials, which are signed in ASL, useful. And anyone can contribute subtitles in any language to our YouTube channel.
If you create or use accessible tutorials, or run a Jam, Code Club, or CoderDojo that is designed to be friendly to people who are neuroatypical or have a disability, let us know how to find your resource or event in the comments!
Abstract: VoLTE (Voice over LTE) is a technology implemented by many operators over the world. Unlike previous 2G/3G technologies, VoLTE offers the possibility to use the end-to-end IP networks to handle voice communications. This technology uses VoIP (Voice over IP) standards over IMS (IP Multimedia Subsystem) networks. In this paper, we will first introduce the basics of VoLTE technology. We will then demonstrate how to use an Android phone to communicate with VoLTE networks and what normal VoLTE communications look like. Finally, we will describe different issues and implementations’ problems. We will present vulnerabilities, both passive and active, and attacks that can be done using VoLTE Android smartphones to attack subscribers and operators’ infrastructures. Some of these vulnerabilities are new and not previously disclosed: they may allow an attacker to silently retrieve private pieces of information on targeted subscribers, such as their geolocation.
April showers bring May startups! This month we have three hot startups for you to check out. Keep reading to find out what they’re up to, and how they’re using AWS to do it.
Today’s post features the following startups:
Lobster – an AI-powered platform connecting creative social media users to professionals.
Visii – helping consumers find the perfect product using visual search.
Tiqets – a curated marketplace for culture and entertainment.
Lobster (London, England)
Every day, social media users generate billions of authentic images and videos to rival typical stock photography. Powered by Artificial Intelligence, Lobster enables brands, agencies, and the press to license visual content directly from social media users so they can find that piece of content that perfectly fits their brand or story. Lobster does the work of sorting through major social networks (Instagram, Flickr, Facebook, Vk, YouTube, and Vimeo) and cloud storage providers (Dropbox, Google Photos, and Verizon) to find media, saving brands and agencies time and energy. Using filters like gender, color, age, and geolocation can help customers find the unique content they’re looking for, while Lobster’s AI and visual recognition finds images instantly. Lobster also runs photo challenges to help customers discover the perfect image to fit their needs.
Lobster is an excellent platform for creative people to get their work discovered while also protecting their content. Users are treated as copyright holders and earn 75% of the final price of every sale. The platform is easy to use: new users simply sign in with an existing social media or cloud account and can start showcasing their artistic talent right away. Lobster allows users to connect to any number of photo storage sources so they’re able to choose which items to share and which to keep private. Once users have selected their favorite photos and videos to share, they can sit back and watch as their work is picked to become the signature for a new campaign or featured on a cool website – and start earning money for their work.
Lobster is using a variety of AWS services to keep everything running smoothly. The company uses Amazon S3 to store photography that was previously ordered by customers. When a customer purchases content, the respective piece of content must be available at any given moment, independent from the original source. Lobster is also using Amazon EC2 for its application servers and Elastic Load Balancing to monitor the state of each server.
In today’s vast web, a growing number of products are being sold online and searching for something specific can be difficult. Visii was created to cater to businesses and help them extract value from an asset they already have – their images. Their SaaS platform allows clients to leverage an intelligent visual search on their websites and apps to help consumers find the perfect product for them. With Visii, consumers can choose an image and immediately discover more based on their tastes and preferences. Whether it’s clothing, artwork, or home decor, Visii will make recommendations to get consumers to search visually and subsequently help businesses increase their conversion rates.
There are multiple ways for businesses to integrate Visii on their website or app. Many of Visii’s clients choose to build against their API, but Visii also work closely with many clients to figure out the most effective way to do this for each unique case. This has led Visii to help build innovative user interfaces and figure out the best integration points to get consumers to search visually. Businesses can also integrate Visii on their website with a widget – they just need to provide a list of links to their products and Visii does the rest.
Visii runs their entire infrastructure on AWS. Their APIs and pipeline all sit in auto-scaling groups, with ELBs in front of them, sending things across into Amazon Simple Queue Service and Amazon Aurora. Recently, Visii moved from Amazon RDS to Aurora and noted that the process was incredibly quick and easy. Because they make heavy use of machine learning, it is crucial that their pipeline only runs when required and that they maximize the efficiency of their uptime.
Tiqets is making the ticket-buying experience faster and easier for travelers around the world. Founded in 2013, Tiqets is one of the leading curated marketplaces for admission tickets to museums, zoos, and attractions. Their mission is to help travelers get the most out of their trips by helping them find and experience a city’s culture and entertainment. Tiqets partners directly with vendors to adapt to a customer’s specific needs, and is now active in over 30 cities in the US, Europe, and the Middle East.
With Tiqets, travelers can book tickets either ahead of time or at their destination for a wide range of attractions. The Tiqets app provides real-time availability and delivers tickets straight to customer’s phones via email, direct download, or in the app. Customers save time skipping long lines (a perk of the app!), save trees (don’t need to physically print tickets), and most importantly, they can make the most out of their leisure time. For each attraction featured on Tiqets, there is a lot of helpful information including best modes of transportation, hours, commonly asked questions, and reviews from other customers.
The Tiqets platform consists of the consumer-facing website, the internal and external-facing APIs, and the partner self-service portals. For the app hosting and infrastructure, Tiqets uses AWS services such as Elastic Load Balancing, Amazon EC2, Amazon RDS, Amazon CloudFront, Amazon Route 53, and Amazon ElastiCache. Through the infrastructure orchestration of their AWS configuration, they can easily set up separate development or test environments while staying close to the production environment as well.
Tiqets is hiring! Be sure to check out their jobs page if you are interested in joining the Tiqets team.
—
Thanks for reading and don’t forget to check out April’s Hot Startups if you missed it.
In today’s business environments, data is generated in a continuous fashion by a steadily increasing number of diverse data sources. Therefore, the ability to continuously capture, store, and process this data to quickly turn high volume streams of raw data into actionable insights has become a substantial competitive advantage for organizations.
Apache Flink is an open source project that is well suited to form the basis of such a stream processing pipeline. It offers unique capabilities that are tailored to the continuous analysis of streaming data. However, building and maintaining a pipeline based on Flink often requires considerable expertise, in addition to physical resources and operational efforts.
This post outlines a reference architecture for a consistent, scalable, and reliable stream processing pipeline that is based on Apache Flink using Amazon EMR, Amazon Kinesis, and Amazon Elasticsearch Service. An AWSLabs GitHub repository provides the artifacts that are required to explore the reference architecture in action. Resources include a producer application that ingests sample data into an Amazon Kinesis stream and a Flink program that analyses the data in real time and sends the result to Amazon ES for visualization.
Analyzing geospatial taxi data in real time
Consider a scenario related to optimizing taxi fleet operations. You obtain information continuously from a fleet of taxis currently operating in New York City. Using this data, you want to optimize the operations by analyzing the gathered data in real time and making data-based decisions.
You would like, for instance, to identify hot spots—areas that are currently in high demand for taxis—so that you can direct unoccupied taxis there. You also want to track current traffic conditions so that you can give approximate trip durations to customers, for example, for rides to the nearby airports. Naturally, your decisions should be based on information that closely reflects the current demand and traffic conditions. The incoming data needs to be analyzed in a continuous and timely fashion. Relevant KPIs and derived insights should be accessible to real-time dashboards.
For the purpose of this post, you emulate a stream of trip events by replaying a dataset of historic taxi trips collected in New York City into Amazon Kinesis Streams. The dataset is available from the New York City Taxi & Limousine Commission website. It contains information on the geolocation and collected fares of individual taxi trips.
In more realistic scenarios, you could leverage AWS IoT to collect the data from telemetry units installed in the taxis and then ingest the data into an Amazon Kinesis stream.
Architecture of a reliable and scalable stream processing pipeline
Because the pipeline serves as the central tool to operate and optimize the taxi fleet, it’s crucial to build an architecture that is tolerant against the failure of single nodes. The pipeline should adapt to changing rates of incoming events. Therefore, you should separate the ingestion of events, their actual processing, and the visualization of the gathered insights into different components. By loosely coupling these components of the infrastructure and using managed services, you can increase the robustness of the pipeline in case of failures. You can also scale the different parts of your infrastructure individually and reduce the efforts that are required to build and operate the entire pipeline.
By decoupling the ingestion and storage of events sent by the taxis from the computation of queries deriving the desired insights, you can substantially increase the robustness of the infrastructure.
Events are initially persisted by means of Amazon Kinesis Streams, which holds a replayable, ordered log and redundantly stores events in multiple Availability Zones. Later, the events are read from the stream and processed by Apache Flink. As Flink continuously snapshots its internal state, the failure of an operator or entire node can be recovered by restoring the internal state from the snapshot and replaying events that need to be reprocessed from the stream.
Another advantage of a central log for storing events is the ability to consume data by multiple applications. It is feasible to run different versions of a Flink application side by side for benchmarking and testing purposes. Or, you could use Amazon Kinesis Firehose to persist the data from the stream to Amazon S3 for long-term archival and then thorough historical analytics, using Amazon Athena.
Because Amazon Kinesis Streams, Amazon EMR, and Amazon ES are managed services that can be created and scaled by means of simple API calls, using these services allows you to focus your expertise on providing business value. Let AWS do the undifferentiated heavy lifting that is required to build and, more importantly, operate and scale the entire pipeline. The creation of the pipeline can be fully automated with AWS CloudFormation and individual components can be monitored and automatically scaled by means of Amazon CloudWatch. Failures are detected and automatically mitigated.
For the rest of this post, I focus on aspects that are related to building and running the reference architecture on AWS. To learn more about Flink, see the Flink training session that discusses how to implement Flink programs using its APIs. The scenario used in the session is fairly similar to the one discussed in this post.
Building and running the reference architecture
To see the taxi trip analysis application in action, use two CloudFormation templates to build and run the reference architecture:
The first template builds the runtime artifacts for ingesting taxi trips into the stream and for analyzing trips with Flink
The second template creates the resources of the infrastructure that run the application
The resources that are required to build and run the reference architecture, including the source code of the Flink application and the CloudFormation templates, are available from the flink-stream-processing-refarch AWSLabs GitHub repository.
Building the runtime artifacts and creating the infrastructure
Execute the first CloudFormation template to create an AWS CodePipeline pipeline, which builds the artifacts by means of AWS CodeBuild in a serverless fashion. You can also install Maven and building the Flink Amazon Kinesis connector and the other runtime artifacts manually. After all stages of the pipeline complete successfully, you can retrieve the artifacts from the S3 bucket that is specified in the output section of the CloudFormation template.
When the first template is created and the runtime artifacts are built, execute the second CloudFormation template, which creates the resources of the reference architecture described earlier.
Wait until both templates have been created successfully before proceeding to the next step. This takes up to 15 minutes, so feel free to get a fresh cup of coffee while CloudFormation does all the work for you.
Starting the Flink runtime and submitting a Flink program
To start the Flink runtime and submit the Flink program that is doing the analysis, connect to the EMR master node. The parameters of this and later commands can be obtained from the output sections of the two CloudFormation templates, which have been used to provision the infrastructure and build the runtime artifacts.
$ ssh -C -D 8157 «EMR master node IP»
The EMR cluster that is provisioned by the CloudFormation template comes with two c4.xlarge core nodes with four vCPUs each. Generally, you match the number of node cores to the number of slots per task manager. For this post, it is reasonable to start a long-running Flink cluster with two task managers and four slots per task manager:
After the Flink runtime is up and running, the taxi stream processor program can be submitted to the Flink runtime to start the real-time analysis of the trip events in the Amazon Kinesis stream.
Now that the Flink application is running, it is reading the incoming events from the stream, aggregating them in time windows according to the time of the events, and sending the results to Amazon ES. The Flink application takes care of batching records so as not to overload the Elasticsearch cluster with small requests and of signing the batched requests to enable a secure configuration of the Elasticsearch cluster.
Ingesting trip events into the Amazon Kinesis stream
To ingest the events, use the taxi stream producer application, which replays a historic dataset of taxi trips recorded in New York City from S3 into an Amazon Kinesis stream with eight shards. In addition to the taxi trips, the producer application also ingests watermark events into the stream so that the Flink application can determine the time up to which the producer has replayed the historic dataset.
This application is by no means specific to the reference architecture discussed in this post. You can easily reuse it for other purposes as well, for example, building a similar stream processing architecture based on Amazon Kinesis Analytics instead of Apache Flink.
Exploring the Kibana dashboard
Now that the entire pipeline is running, you can finally explore the Kibana dashboard that displays insights that are derived in real time by the Flink application:
For the purpose of this post, the Elasticsearch cluster is configured to accept connections from the IP address range specified as a parameter of the CloudFormation template that creates the infrastructure. For production-ready applications, this may not always be desirable or possible. For more information about how to securely connect to your Elasticsearch cluster, see the Set Access Control for Amazon Elasticsearch Service post on the AWS Database blog.
In the Kibana dashboard, the map on the left visualizes the start points of taxi trips. The redder a rectangle is, the more taxi trips started in that location. The line chart on the right visualizes the average duration of taxi trips to John F. Kennedy International Airport and LaGuardia Airport, respectively.
Given this information, taxi fleet operations can be optimized by proactively sending unoccupied taxis to locations that are currently in high demand, and by estimating trip durations to the local airports more precisely.
You can now scale the underlying infrastructure. For example, scale the shard capacity of the stream, change the instance count or the instance types of the Elasticsearch cluster, and verify that the entire pipeline remains functional and responsive even during the rescale operation.
Running Apache Flink on AWS
As you have just seen, the Flink runtime can be deployed by means of YARN, so EMR is well suited to run Flink on AWS. However, there are some AWS-related considerations that need to be addressed to build and run the Flink application:
Building the Flink Amazon Kinesis connector
Adapting the Amazon Kinesis consumer configuration
Enabling event time processing by submitting watermarks to Amazon Kinesis
Connecting Flink to Amazon ES
Building the Flink Amazon Kinesis connector
Flink provides a connector for Amazon Kinesis streams. In contrast to other Flink artifacts, the Amazon Kinesis connector is not available from Maven central, so you need to build it yourself. I recommend building Flink with Maven 3.2.x instead of the more recent Maven 3.3.x release, as Maven 3.3.x may produce outputs with improperly shaded dependencies.
Adapting the Amazon Kinesis consumer configuration
Flink recently introduced support for obtaining AWS credentials from the role that is associated with an EMR cluster. Enable this functionality in the Flink application source code by setting the AWS_CREDENTIALS_PROVIDER property to AUTO and by omitting any AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY parameters from the Properties object.
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerCon-fig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
Credentials are automatically retrieved from the instance’s metadata and there is no need to store long-term credentials in the source code of the Flink application or on the EMR cluster.
As the producer application ingests thousands of events per second into the stream, it helps to increase the number of records fetched by Flink in a single GetRecords call. Change this value to the maximum value that is supported by Amazon Kinesis.
Enabling event time processing by submitting watermarks to Amazon Kinesis
Flink supports several notions of time, most notably event time. Event time is desirable for streaming applications as it results in very stable semantics of queries. The time of events is determined by the producer or close to the producer. The reordering of events due to network effects has substantially less impact on query results.
To realize event time, Flink relies on watermarks that are sent by the producer in regular intervals to signal the current time at the source to the Flink runtime. When integrating with Amazon Kinesis Streams, there are two different ways of supplying watermarks to Flink:
Manually adding watermarks to the stream
Relying on ApproximalArrivalTime, which is automatically added to events on their ingestion to a stream
By just setting the time model to event time on an Amazon Kinesis stream, Flink automatically uses the ApproximalArrivalTime value supplied by Amazon Kinesis.
Alternatively, you can choose to use the time that is determined by the producer by specifying a custom Timestamp Assigner operator that extracts the watermark information from the corresponding events of the stream.
If you rely on PunctuatedAssigner, it is important to ingest watermarks to all individual shards, as Flink processes each shard of a stream individually. This can be realized by enumerating the shards of a stream. Ingest watermarks to specific shards by explicitly setting the hash key to the hash range of the shard to which the watermark should be sent.
The producer that is ingesting the taxi trips into Amazon Kinesis uses the latter approach. You can explore the details of the implementation in the flink-stream-processing-refarch AWSLabs GitHub repository.
Connecting Flink to Amazon ES
Flink provides several connectors for Elasticsearch. However, all these connectors merely support the TCP transport protocol of Elasticsearch, whereas Amazon ES relies on the HTTP protocol. As of Elasticsearch 5, the TCP transport protocol is deprecated. While an Elasticsearch connector for Flink that supports the HTTP protocol is still in the works, you can use the Jest library to build a custom sink able to connect to Amazon ES. The sink should be capable of signing requests with IAM credentials.
final AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain();
final Supplier<LocalDateTime> clock = () -> Local-DateTime.now(ZoneOffset.UTC);
final AWSSigner awsSigner = new AWSSign-er(credentialsProvider, "«region»", "es", clock);
final AWSSigningRequestInterceptor requestInterceptor = new AWSSigningRequestInterceptor(awsSigner);
final JestClientFactory factory = new JestClientFactory() {
@Override
protected HttpClientBuilder configureHttpCli-ent(HttpClientBuilder builder) {
builder.addInterceptorLast(requestInterceptor);
return builder;
}
@Override
protected HttpAsyncClientBuilder configureHttpCli-ent(HttpAsyncClientBuilder builder) {
builder.addInterceptorLast(requestInterceptor);
return builder;
}
};
factory.setHttpClientConfig(new HttpClientConfig
.Builder("«es endpoint»"))
.multiThreaded(true)
.build());
final JestClient client = factory.getObject();
For the full implementation details of the Elasticsearch sink, see the flink-taxi-stream-processor AWSLabs GitHub repository, which contains the source code of the Amazon Kinesis program.
Summary
This post discussed how to build a consistent, scalable, and reliable stream processing architecture based on Apache Flink. It illustrates how to leverage managed services to reduce the expertise and operational effort that is usually required to build and maintain a low latency and high throughput stream processing pipeline, so that you can focus your expertise on providing business value.
Start using Apache Flink on Amazon EMR today. The AWSLabs GitHub repository contains the resources that are required to run through the given example and includes further information that helps you to get started quickly.
If you have questions or suggestions, please comment below.
About the Author
Dr. Steffen Hausmann is a Solutions Architect with Amazon Web Services. He has a strong background in the area of complex event and stream processing and supports customers on their cloud journey. In his spare time, he likes hiking in the nearby mountains.
Crowdstrike has an interesting blog post about how the Russian military is tracking Ukrainian field artillery units by compromising soldiers’ smartphones and tracking them.
Ray Zhu from the Amazon Kinesis team wrote this great post about how to set up a streaming data pipeline. He carefully shows you step by step how he set it all up and how you can do it too.
-Ana
Consumer demand for better experiences is ever increasing today. Companies across different industry segments are looking for ways to differentiate their products and services. Data is a key ingredient for providing differentiated products and services, and this is no longer a secret but rather a well adopted practice. Almost all companies at meaningful size are using some sort of data technologies, which means being able to collect and use data is no longer enough as a differentiating factor. Then what? How fast you can collect and use your data becomes the key to stay competitive.
Streaming data technologies shorten the time to analyze and use your data from hours and days to minutes and seconds. Let’s walk through an example of using Amazon Kinesis Firehose, Amazon Redshift, and Amazon QuickSight to set up a streaming data pipeline and visualize Maryland traffic violation data in real time.
Data Flow Overview
Step 1 Set up Redshift database and table
In this step, I’ll set up a Redshift table for Kinesis Firehose to continuously load streaming data into. I first start a single node Redshift cluster and name it “raystreaming.”
Under “Additional Configuration”, I make the cluster publicly accessible so that Kinesis Firehose and QuickSight can connect to my cluster.
After reviewing all configurations, I click on “Launch Cluster”.
Once the cluster is active, I go to the cluster’s VPC Security Groups to add inbound access for Kinesis Firehose service IPs and outbound access for QuickSight service IPs.
Kinesis Firehose service IPs:
US East (N. Virginia)
52.70.63.192/27
US West (Oregon)
52.89.255.224/27
EU (Ireland)
52.19.239.192/27
QuickSight service IPs:
US East (N. Virginia)
52.23.63.224/27
US West (Oregon) (us-west-2)
54.70.204.128/27
EU (Ireland) (eu-west-1)
52.210.255.224/27
Now the cluster is setup and configured. I’ll use a JDBC tool and the SQL statement below to create a table for storing Maryland traffic violation data.
create table TrafficViolation( dateofstop date, timeofstop timestamp, agency varchar(100), subagency varchar(100), description varchar(300), location varchar(100), latitude varchar(100), longtitude varchar(100), accident varchar(100), belts varchar(100), personalinjury varchar(100), propertydamage varchar(100), fatal varchar(100), commlicense varchar(100), hazmat varchar(100), commvehicle varchar(100), alcohol varchar(100), workzone varchar(100), state varchar(100), veichletype varchar(100), year varchar(100), make varchar(100), model varchar(100), color varchar(100), violation varchar(100), type varchar(100), charge varchar(100), article varchar(100), contributed varchar(100), race varchar(100), gender varchar(100), drivercity varchar(100), driverstate varchar(100), dlstate varchar(100), arresttype varchar(100), geolocation varchar(100));
Step 2 Set up Kinesis Firehose delivery stream
In this step, I’ll set up a Kinesis Firehose delivery stream to continuously deliver data to the “TrafficViolation” table created above.
I name my Firehose delivery stream “rayredshiftstream”. Under destination configurations, I choose “Amazon Redshift” as destination and configure an intermediate S3 bucket. Kinesis Firehose will first load my streaming data to this intermediate buckets and then COPY it to Redshift. Loading data from S3 to Redshift is efficient and preserves resources on Redshift for queries. Also, I always have a backup of my data in S3 for other batch processes or in case my Redshift cluster is not accessible (e.g. under maintenance).
Subsequently, I enter the Redshift cluster, database, and table names along with Redshift user name and password. This user needs to have Redshift INSERT permission. I also specify “json ‘auto’” under COPY options to parse JSON formatted sample data.
I set retry duration to 30 seconds. In cases when data load to my Redshift cluster fails, Kinesis Firehose will retry for 30 seconds. The failed data is always in the intermediate S3 bucket for backfill. At the bottom, the exact COPY command Kinesis Firehose will use is generated for testing purposes.
On the next page, I specify buffer sizeand buffer interval. Kinesis Firehose buffers streaming data to a certain size or for a certain period of time before loading it to S3. Kinesis Firehose’s buffering feature reduces S3 PUT requests and cost significantly and generates relatively larger S3 object size for efficient data load to Redshift. I’m using the smallest buffer size (1MB) and shortest buffer interval (60 seconds) in this example in order to have data delivered sooner.
You can also optionally configure Kinesis Firehose to compress the data in GZIP format before loading it to S3 and use a KMS key to encrypt the data in S3. In this example, I configure my data to be uncompressed and unencrypted. Please note that if you enable GZIP compression, you’ll also need to add “gzip” under Redshift COPY options.
I also enable error logging for Kinesis Firehose to log any delivery errors to my CloudWatch Log group. The error messages are viewable from Kinesis Firehose console as well and are particularly useful for troubleshooting purpose.
Finally, I configure a default IAM role to allow Kinesis Firehose to access the resources I configured in the delivery stream.
After reviewing all configurations, I click on “Create Delivery Stream”.
Step 3 Send data to Kinesis Firehose delivery stream
Now my Firehose delivery stream is set up and pointing to my Redshift table “TrafficViolation”. In this example, I’m using the Traffic Violations dataset from US Government Open Data. I use the Kinesis Firehose sample from AWS Java SDK to parse records from local csv file and send each record to my delivery stream.
In real streaming use cases, you can imagine that each data record is pushed to the delivery stream from police officer’s cellular devices through Firehose’s PutRecord() or PutRecordBatch() APIs as soon as a violation ticket is recorded.
A sample of the data looks like the following and includes information such as time of stop, vehicle type, driver gender, and so forth.
09/30/2014,23:51:00,MCP,"1st district, Rockville",\
DRIVER FAILURE TO STOP AT STEADY CIRCULAR RED SIGNAL,\
PARK RD AT HUNGERFORD DR,,,No,No,No,No,No,No,No,No,No,No,\
MD,02 - Automobile,2014,FORD,MUSTANG,BLACK,Citation,21-202(h1),\
Transportation Article,No,BLACK,M,ROCKVILLE,MD,MD,A - Marked Patrol,
Step 4 Visualize the data from QuickSight
As I continuously push data records to my delivery stream “rayredshiftstream”, I can see these data gets populated to my Redshift table “TrafficViolation” continuously.
Now I’m going to use QuickSight to analyze and visualize the data from my Redshift table “TrafficViolation”. I create a new analysis and a new data set pointing to my Redshift table “TrafficViolation”.
I use “Query” mode to directly retrieve data from my Redshift cluster so that new data is retrieved as they are continuously streamed from Kinesis Firehose.
With a few clicks, I create a bar chart graph that displays number of traffic violations by gender and hour of the day. There are a few interesting patterns: 1) Male drivers have significantly more traffic violations than female drivers during morning hours. 2) Noon has the lowest number of violations. 3) From 2pm to 4pm, the number of violations gap between male and female drivers narrows.
With a live dashboard, this graph will keep updating itself throughout the day as new data continuously gets streamed from police officer’s devices to Redshift through Kinesis Firehose. Another interesting live dashboard to build will be a map graph that shows a heat map of traffic violations across different districts of Maryland over time. I’ll leave this exercise to the readers of this blog and you can use your favorite Business Intelligent tools to do so.
That’s it!
Hopefully through reading this blog and trying it out yourself, you’ve got some inspirations about streaming data and a sense of how easy it is to get started with streaming data analytics on AWS. I cannot wait to see what streaming data analytics pipelines and applications you can build for your organizations!
“People You May Know are people on Facebook that you might know,” a Facebook spokesperson said. “We show you people based on mutual friends, work and education information, networks you’re part of, contacts you’ve imported and many other factors.”
One of those factors is smartphone location. A Facebook spokesperson said though that shared location alone would not result in a friend suggestion, saying that the two parents must have had something else in common, such as overlapping networks.
“Location information by itself doesn’t indicate that two people might be friends,” said the Facebook spokesperson. “That’s why location is only one of the factors we use to suggest people you may know.”
The article goes on to describe situations where you don’t want Facebook to do this: Alcoholics Anonymous meetings, singles bars, some Tinder dates, and so on. But this is part of Facebook’s aggressive use of location data in many of its services.
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.