Optimizing file formats and compression
In the earlier blog post Athena: Beyond the Basics – Part 1, we have examined working with twitter data and executing complex queries using Athena. In the current article, we will understand the pricing model, experiment with different file formats and compression techniques and perform analysis based on the results and decide the best price to performance solution for the current use case.
Athena Pricing
The Athena service is priced by the amount of data scanned when running the queries. Selecting the appropriate format and compression and balancing those factors with response time for queries will yield considerable cost savings for the expected response times.
https://aws.amazon.com/athena/pricing/
The following section will share the process of converting to multiple file formats and compressions for additional experimentation for calculating the costs and response time for Twitter use-case.
Converting File formats and Compress
Athena currently doesn’t support inserting/updating data on S3. EMR cluster or similar has to be used to create new data set using preferred file format from existing data set. We are going to add EMR to the previous architecture to convert the data to different file formats and compression techniques
The following link covers great deal of converting into multiple file formats
http://docs.aws.amazon.com/athena/latest/ug/convert-to-columnar.html
The approach for converting to different file formats and querying from Athena
- Create Hive table to the relevant target format and compression
CREATE EXTERNAL TABLE ([column - data type])
STORED AS
LOCATION
tblproperties(“)
- Populate the data into new storage/compression format
INSERT INTO SELECT * from
- Create Athena table based on the new dataset stored on S3. Currently, Athena catalog manager doesn’t share Hive catalog
The following code snippets are used to create multiple versions of the same data set for experimenting with Athena
JSON FORMAT:
- To convert from Json to snappy compression we execute this commands in HIVE
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;
Then we create the snappy external table exactly the same as we’ve created it as json but with different name (tweets_snappy for example) and in different location and insert from json table to snappy table
- To convert from Json to gzip compression we execute this commands in HIVE
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
SET mapred.output.compression.type=BLOCK;
Then we create the gzip external table exactly the same as we’ve created it as json but with different name (tweets_gzip for example) and in different location and insert from json table to gzip table
TIP: for Hive running faster is better to copy from a compressed table, so it has to read less data and do less maps.
PARQUET FORMAT:
- To convert from parquet to parquet_snappy compression we add the following to the end of CREATE parquet table statement:
TBLPROPERTIES ("PARQUET.COMPRESS"="SNAPPY");
Then we insert from any other already created and with data (json, json_snappy, parquet…) to parquet_snappy table
- To convert from parquet to parquet_gzip compression we we add the following to the end of CREATE parquet table statement:
TBLPROPERTIES ("PARQUET.COMPRESS"="GZIP");
Then we insert from any other already created and with data (json, json_snappy, parquet…) to parquet_gzip table
ORC FORMAT:
- To convert from orc to orc_snappy compression we add the following to the end of CREATE ORC table statement:
TBLPROPERTIES ("ORC.COMPRESS"="SNAPPY");
Then we insert from any other already created and with data (json, json_snappy, parquet…) to orc_snappy table
- To convert from orc to orc_gzip compression we we add the following to the end of CREATE orc table statement:
TBLPROPERTIES ("ORC.COMPRESS"="GZIP");
Then we insert from any other already created and with data (json, json_snappy, parquet…) to orc_gzip table.
Based on the above code snippets JSON, ORC, PARQUET without and with compression (SNAPPY, GZIP) are created for a total of 9 tables
Queries for Analysis
The following queries from the earlier blog post will be used to perform analysis for data scanned and response times.
Query 1: Total records in the data set
Select count(*) from tweets
Query 2: Get all records
select * from tweets
Query 3: Top Hashtags with at least 100 occurrences
SELECT ht.text,
count(*)
FROM tweets
CROSS JOIN UNNEST (entities.hashtags) AS t(ht)
GROUP BY ht.text
HAVING count(*)>100
ORDER by count(*) desc;
Query 4: Number of tweets from verified accounts with most followers
SELECT user.screen_name,user.name,max(user.followers_count),count(*)
FROM tweets
WHERE user.verified='true'
GROUP BY user.screen_name,user.name
ORDER BY cast(max(user.followers_count) as integer) DESC
Query 5: Top URL mentions in Tweets
SELECT url_extract_host(u.expanded_url),
count(*)
FROM tweets
CROSS JOIN UNNEST (entities.urls) AS t(u)
GROUP BY url_extract_host(u.expanded_url)
HAVING count(*)>100
ORDER by count(*) desc;
Query 6: Hashtags tweeted along with “Amazon”
WITH ht_list AS
(SELECT entities.hashtags
FROM tweets
CROSS JOIN UNNEST (entities.hashtags) AS t(ht)
WHERE ht.text LIKE 'amazon')
SELECT t AS "hashtag",count(*) AS "occurences" FROM ht_list
CROSS JOIN UNNEST (hashtags) AS t(t)
GROUP BY t
ORDER BY count(*) desc;
Query 7: Find the number of tweets by language and sensitive media content
SELECT lang, possibly_sensitive, count(*)
FROM tweets
GROUP BY lang, possibly_sensitive
Query execution metrics
The queries are executed on Athena (from Athena Query manager). Response times and Data scanned metrics are captured with each execution. Though the data scanned is same across multiple executions of query and table combination, response time varied by few milliseconds and sometimes by couple of seconds. We have taken the average of response time over multiple executions for the following output
Testing Analysis
- For the current use case and the identified set of queries, ORC format is providing the best price to performance ($0.55 for executing all queries within the 30 seconds of aggregated response times). This can be attributed to columnar storage format and the query use cases used for the blog post
- Converting into ORC format or PARQUET format can save the costs on Athena usage by over 90%
- Compressing raw JSON to snappy or gzip can also significantly reduce the costs by over 80% but the response time did not improve
- Snappy/Gzip compression on PARQUET and ORC formats have limited benefits as the file formats are already compressed
Avoid using raw text format of dataset when querying with Athena
Conclusion
The blog post is intended to share the approach of testing and experimenting with different file formats and compression techniques to optimize for costs and performance. The findings for the current use case may or may not be applicable for different queries and use cases. We recommend that different options are tested out before finalizing the formats and compression for huge data sets and data lake scenarios.
“Athena: Beyond the Basics – Part 2”
Additional references:
List of SQL statements supported by Athena
https://aws.amazon.com/blogs/big-data/analyzing-data-in-s3-using-amazon-athena/
https://orc.apache.org/
https://parquet.apache.org/