Amazon Athena: Beyond The Basics – Part 1

Working with Twitter (complex JSON) data set

Amazon Athena is a serverless interactive query service that allows analytics using standard SQL for data residing in S3. Before Athena, to query data sets on S3, Hive/Presto/Hue or similar tools had to be installed on top EMR service or integrated with other third party partner products.

Athena also supports JDBC connectivity so the managed service can be easily integrated with wide variety of SQL and Visualization tools.

https://aws.amazon.com/athena/

Lot of customers are interested in exploring Amazon Athena for their use case and looking for ways to optimize for performance and costs. As an APN Partner NorthBay has been working with Athena in testing and exploring various customer use cases. This is a multi-part blog series to share our findings as well as provide the audience with a jumpstart on working with Amazon Athena..

Twitter use case

Unstructured Data and semi-structured (typically JSON) is becoming typical for Big Data sets. We have chosen Twitter data as the data set to validate working on Athena with complex JSON’s. The current blog post will share the details of querying Twitter data using Athena and executing complex queries based on the data set.

The following is the architecture followed for the implementation:

Amazon Athena: Beyond The Basics – Part 1

– Configure Twitter for API access

– Configure Kinesis Firehose to stream the output to S3

– Configure and run Tweepy to read Twitter feed and stream to Kinesis Firehose

– Define schema definition in Athena

– Query Twitter data from Athena Query Editor

– Query Twitter data using JDBC connection

– Query Twitter data from Quicksight

Configure Twitter for API access

To create this platform, you will need an AWS account and a Twitter application. Sign in with your Twitter account and create a new application at https://apps.twitter.com/. Make sure your application is set for ‘read-only’ access. Next, choose Create My Access Token at the bottom of the Keys and Access Tokens tab. By this point, you should have four Twitter application keys: consumer key (API key), consumer secret (API secret), access token, and access token secret. Take note of these keys.

https://aws.amazon.com/blogs/big-data/building-a-near-real-time-discovery-platform-with-aws/

Configure Kinesis Firehose to stream the output to S3

Create a Kinesis Firehose Delivery Stream as the destination for our data.

Amazon Athena: Beyond The Basics – Part 1
Amazon Athena: Beyond The Basics – Part 1

Choose “Create bucket”:

Step 1: Configure Destination: Choose “Amazon S3” as Destination and select the existing S3 bucket or create a new Bucket for Firehose to persist the data.

Amazon Athena: Beyond The Basics – Part 1
Amazon Athena: Beyond The Basics – Part 1

Once the Bucket is created, add a prefix to the data. In this case, json/ prefix is added so all json data goes to the same bucket/prefix

Step2: Configuration: Kinesis Firehose allows for optimizations and configuration for Buffer sizes, interval, compression, encryption and security policies. These values can be chosen based on the streaming ingest frequency and optimal size of the output file in S3.

Amazon Athena: Beyond The Basics – Part 1
Amazon Athena: Beyond The Basics – Part 1

We just have to click “Allow” in the new window without changing anything.

Step3 Review:

Review the configuration and click “Create Delivery Stream”

Amazon Athena: Beyond The Basics – Part 1

In the Firehose Delivery Stream console, we can see our created Delivery Stream with status “CREATING”. Once the status changes to “ACTIVE” we can start using the delivery stream.

Ingest Twitter feeds from the feeder system (Tweepy/Python)

http://docs.tweepy.org/en/v3.5.0/index.html

We need a stream producer/feeder system to publish streaming data to Kinesis Firehose. Tweepy is an open-source python library that enables communication with Twitter. The following code can be run on an EC2 instance (with relevant IAM role to access Kinesis Firehose and Twitter API credentials from the earlier step in configuration file) to feed the stream that we created earlier.

import tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import time
import argparse
import string
import config
import json
import boto3

def get_parser():
“””Get parser for command line arguments.”””
parser = argparse.ArgumentParser(description=”Twitter Downloader”)
parser.add_argument(“-q”,
“–query”,
dest=”query”,
help=”Query/Filter”,
default=’-‘)
parser.add_argument(“-d”,
“–data-dir”,
dest=”data_dir”,
help=”Output/Data Directory”)
return parser

class MyListener(StreamListener):
“””Custom StreamListener for streaming data.”””

def __init__(self, data_dir, query):
query_fname = format_filename(query)
self.outfile = “%s/stream_%s.json” % (data_dir, query_fname)

def on_data(self, data):
try:
result = send_record_to_firehose(data)
print(result)
except BaseException as e:
print(“Error on_data: %s” % str(e))
time.sleep(5)
return True

def on_error(self, status):
print(“Error with status:” + str(status))
if status == 420:
print(“You are being rate limited!!!.”)
return True

def send_record_to_firehose(data):
“”” Sends Json response from tweeter to Kinesis Firehose Delivery Stream
Arguments:
data — json file from tweeter
Return:
String — json response from Kinesis Firehos
“””
client = boto3.client(‘firehose’)
response = client.put_record(
DeliveryStreamName=’twitter-to-s3′,
Record={
‘Data’: data
}
)
return response

def format_filename(fname):
“””Convert file name into a safe string.

Arguments:
fname — the file name to convert
Return:
String — converted file name
“””
return ”.join(convert_valid(one_char) for one_char in fname)

def convert_valid(one_char):
“””Convert a character into ‘_’ if invalid.

Arguments:
one_char — the char to convert
Return:
Character — converted char
“””
valid_chars = “-_.%s%s” % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return ‘_’

@classmethod
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, ‘json’, json.dumps(raw))
return status

if __name__ == ‘__main__’:
parser = get_parser()
args = parser.parse_args()
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
# Added this logic to reconnect if it fails
while True:
try:
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query))
twitter_stream.filter(track=[args.query])
except Exception:
# Sure??, let’s reconnect and keep tracking
continue

Define schema definition in Athena

Catalog Manager UI is provided to define new tables in Athena. However, with complex JSON it is easier to run the schema definition DDL in the query editor. The following DDL is generated/built based on Twitter data stream:

CREATE EXTERNAL TABLE IF NOT EXISTS tweets (
created_at string ,
id string ,
id_str string ,
text string ,
display_text_range ARRAY,
source string ,
truncated string ,
user struct< id:string , id_str:string , name:string , screen_name:string , location:string , description:string , protected:string , verified:string , followers_count:string , friends_count:string , listed_count:string , favourites_count:string , statuses_count:string , created_at:string , utc_offset:string , time_zone:string , geo_enabled:string , lang:string>,
is_quote_status string,
extended_tweet STRUCT< full_text:string, display_text_range:ARRAY,
entities:STRUCT< media:ARRAY<STRUCT< id:string, id_str:string, indices:ARRAY,
media_url:string,
media_url_https:string,
url:string,
display_url:string,
expanded_url:string,
type:string,
sizes:STRUCT< small:STRUCT<w:string, h:string, resize:string>,
thumb:STRUCT<w:string, h:string, resize:string>>>>>>,
retweet_count string,
favorite_count string,
retweeted_status STRUCT< retweet_count:string, text:string>,
entities STRUCT< urls:ARRAY<STRUCT<url:string, expanded_url:string, display_url:string, indices:ARRAY>>,
user_mentions:ARRAY<STRUCT>,
hashtags:ARRAY>,
favorited string,
retweeted string,
possibly_sensitive string,
filter_level string,
lang string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://twitter-to-s3/json/';

Query Twitter data from Athena Query Editor

Athena Query Editor provides a UI to submit Queries to Athena. The response also captures the run time and the data scanned which are incredibly useful for estimating costs and optimizing queries.

Amazon Athena: Beyond The Basics – Part 1

Query 1: Total records in the table

SELECT count(*) FROM tweets

Query 2: Get sample of 10000 records

>
SELECT * FROM tweets LIMIT 10000

Query Twitter data from SQL client with JDBC connection

Detailed documentation is available from the following link to establish a connection to Athena from a client tool such as SQL Workbench:

http://docs.aws.amazon.com/athena/latest/ug/connect-with-jdbc.html

Query 3: Top hashtags with at least 100 occurrences

SELECT ht.text,count(*)
FROM tweets
CROSS JOIN UNNEST (entities.hashtags) AS t(ht)
GROUP BY ht.text
HAVING count(*)>100
ORDER by count(*) desc

Query 4: Number of Tweets from verified accounts with the most followers

SELECT user.screen_name,user.name,max(user.followers_count),count(*)
FROM tweets
WHERE user.verified='true'
GROUP BY user.screen_name,user.name
ORDER BY cast(max(user.followers_count) as integer) DESC

Query 5: Top URL mentions in Tweets

SELECT url_extract_host(u.expanded_url),
count(*)
FROM tweets
CROSS JOIN UNNEST (entities.urls) AS t(u)
GROUP BY url_extract_host(u.expanded_url)
HAVING count(*)&gt;100
ORDER by count(*) desc;

Query 6: Hashtags tweeted along with “Amazon”

Query 6: Hashtags tweeted along with “Amazon”WITH ht_list AS
(SELECT entities.hashtags
FROM tweets
CROSS JOIN UNNEST (entities.hashtags) AS t(ht)
WHERE ht.text LIKE 'amazon')
SELECT t AS "hashtag",count(*) AS "occurences" FROM ht_list
CROSS JOIN UNNEST (hashtags) AS t(t)
GROUP BY t
ORDER BY count(*) desc;

Visualize Twitter data from Quicksight using Athena

The following blogpost provides information on querying from Athena using Quicksight

https://aws.amazon.com/blogs/big-data/derive-insights-from-iot-in-minutes-using-aws-iot-amazon-kinesis-firehose-amazon-athena-and-amazon-quicksight/

Quicksight currently does not support complex JSON’s and expects the data types to be among the supported data types:

http://docs.aws.amazon.com/quicksight/latest/user/data-source-limits.html

The current dashboard displays sensitive Tweets by language from the data set:

Amazon Athena: Beyond The Basics – Part 1

The underlying query:

Query 7: Find the number of Tweets by language and sensitive media content

SELECT lang,possibly_senstive,count(*)
FROM tweets
GROUP BY lang, possibly_sensitive

Conclusion

Twitter analysis using Athena proves that the product can be leveraged for use cases involving complex data formats (unstructured/semi-structured), can be automated using JDBC connections, and reveals basic insights using Quicksight (lack of support for arrays currently hinders analytics capabilities).

Athena can be excellent tool for “S3 as a data lake” use cases where the data is already staged in S3 and with serverless managed service, Athena takes precedence over previously used methods like Presto/Hive/Impala on EMR.

“Athena: Beyond the Basics – Part 2”

Additional references:

List of SQL statements supported by Athena

http://docs.aws.amazon.com/athena/latest/ug/language-reference.html