All Blogs

Partner

AWS

amazon athena

Working with Twitter (complex JSON) data set

Amazon Athena is a serverless interactive query service that allows analytics using standard SQL for data residing in S3. Before Athena, to query data sets on S3, Hive/Presto/Hue or similar tools had to be installed on top EMR service or integrated with other third party partner products.

Athena also supports JDBC connectivity so the managed service can be easily integrated with wide variety of SQL and Visualization tools.

https://aws.amazon.com/athena/

Lot of customers are interested in exploring Amazon Athena for their use case and looking for ways to optimize for performance and costs. As an APN Partner NorthBay has been working with Athena in testing and exploring various customer use cases. This is a multi-part blog series to share our findings as well as provide the audience with a jumpstart on working with Amazon Athena..

Twitter use case

Unstructured Data and semi-structured (typically JSON) is becoming typical for Big Data sets. We have chosen Twitter data as the data set to validate working on Athena with complex JSON’s. The current blog post will share the details of querying Twitter data using Athena and executing complex queries based on the data set.

The following is the architecture followed for the implementation:

twitter

– Configure Twitter for API access

– Configure Kinesis Firehose to stream the output to S3

– Configure and run Tweepy to read Twitter feed and stream to Kinesis Firehose

– Define schema definition in Athena

– Query Twitter data from Athena Query Editor

– Query Twitter data using JDBC connection

– Query Twitter data from Quicksight

Configure Twitter for API access

To create this platform, you will need an AWS account and a Twitter application. Sign in with your Twitter account and create a new application at https://apps.twitter.com/. Make sure your application is set for ‘read-only’ access. Next, choose Create My Access Token at the bottom of the Keys and Access Tokens tab. By this point, you should have four Twitter application keys: consumer key (API key), consumer secret (API secret), access token, and access token secret. Take note of these keys.

https://aws.amazon.com/blogs/big-data/building-a-near-real-time-discovery-platform-with-aws/

Configure Kinesis Firehose to stream the output to S3

Create a Kinesis Firehose Delivery Stream as the destination for our data.

Athena Image 2
Athena Image 3

Choose “Create bucket”:

Step 1: Configure Destination: Choose “Amazon S3” as Destination and select the existing S3 bucket or create a new Bucket for Firehose to persist the data.

Athena Image 4
Athena Image 5

Once the Bucket is created, add a prefix to the data. In this case, json/ prefix is added so all json data goes to the same bucket/prefix

Step2: Configuration: Kinesis Firehose allows for optimizations and configuration for Buffer sizes, interval, compression, encryption and security policies. These values can be chosen based on the streaming ingest frequency and optimal size of the output file in S3.

Athena Image 6
Athena Image 7

We just have to click “Allow” in the new window without changing anything.

Step3 Review:

Review the configuration and click “Create Delivery Stream”

Athena Image 8

In the Firehose Delivery Stream console, we can see our created Delivery Stream with status “CREATING”. Once the status changes to “ACTIVE” we can start using the delivery stream.

Ingest Twitter feeds from the feeder system (Tweepy/Python)

http://docs.tweepy.org/en/v3.5.0/index.html

We need a stream producer/feeder system to publish streaming data to Kinesis Firehose. Tweepy is an open-source python library that enables communication with Twitter. The following code can be run on an EC2 instance (with relevant IAM role to access Kinesis Firehose and Twitter API credentials from the earlier step in configuration file) to feed the stream that we created earlier.

import tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import time
import argparse
import string
import config
import json
import boto3

def get_parser():
“””Get parser for command line arguments.”””
parser = argparse.ArgumentParser(description=”Twitter Downloader”)
parser.add_argument(“-q”,
“–query”,
dest=”query”,
help=”Query/Filter”,
default=’-‘)
parser.add_argument(“-d”,
“–data-dir”,
dest=”data_dir”,
help=”Output/Data Directory”)
return parser

class MyListener(StreamListener):
“””Custom StreamListener for streaming data.”””

def __init__(self, data_dir, query):
query_fname = format_filename(query)
self.outfile = “%s/stream_%s.json” % (data_dir, query_fname)

def on_data(self, data):
try:
result = send_record_to_firehose(data)
print(result)
except BaseException as e:
print(“Error on_data: %s” % str(e))
time.sleep(5)
return True

def on_error(self, status):
print(“Error with status:” + str(status))
if status == 420:
print(“You are being rate limited!!!.”)
return True

def send_record_to_firehose(data):
“”” Sends Json response from tweeter to Kinesis Firehose Delivery Stream
Arguments:
data — json file from tweeter
Return:
String — json response from Kinesis Firehos
“””
client = boto3.client(‘firehose’)
response = client.put_record(
DeliveryStreamName=’twitter-to-s3′,
Record={
‘Data’: data
}
)
return response

def format_filename(fname):
“””Convert file name into a safe string.

Arguments:
fname — the file name to convert
Return:
String — converted file name
“””
return ”.join(convert_valid(one_char) for one_char in fname)

def convert_valid(one_char):
“””Convert a character into ‘_’ if invalid.

Arguments:
one_char — the char to convert
Return:
Character — converted char
“””
valid_chars = “-_.%s%s” % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return ‘_’

@classmethod
def parse(cls, api, raw):
status = cls.first_parse(api, raw)
setattr(status, ‘json’, json.dumps(raw))
return status

if __name__ == ‘__main__’:
parser = get_parser()
args = parser.parse_args()
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
# Added this logic to reconnect if it fails
while True:
try:
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query))
twitter_stream.filter(track=[args.query])
except Exception:
# Sure??, let’s reconnect and keep tracking
continue