Prepending Rows to a CSV in Amazon S3

+++ title = “How I used Python and Boto3 to modify csv files in AWS S3” date = “2017-07-21” autoThumbnailImage = false clearReading = true comments = true metaAlignment = “center” showPagination = true showSocial = true showTags = true tags = [“python”,“amazon”, “s3”, “generators”] keywords = [“tech”, “python”,“amazon”, “s3”, “generators”] categories = [“dev”] draft = true

+++ **Note: ** Even though python is mentioned specifically, that’s just the language I used, and the language that the code examples are in. This could really be done using any language as the main functionality that we’ll be using is built in to Amazon S3’s api.

I’m also not saying this is a good solution… just the one I came up with that seems work pretty well.

At work we developed an app to build dynamic sql queries using sql alchemy. The user can build the query they want and get the results in csv file. We also make use of AWS’s ability to unload a query to Redshift. The reason behind this is that if a query returns more X amount of rows, we can just have Redshift run it, and store the csv file in S3 for us. There is a slight problem with this. When a query isn’t unloaded, we have the ability to put in column headers, and then serve the csv file. But when a query is unloaded, only the results are in the csv and the column headers are left out.

Boto3

Boto3 is the library to use for AWS interactions with python. The docs are not bad at all and the api is intuitive. At it’s core, Boto3 is just a nice python wrapper around the AWS api. Even though Boto3 might be python specific, the underlying api calls can be made from any lib in any language.

Since only the larger queries were unloaded to a csv file, these csv files were large. Very large. Large enough to throw Out Of Memory errors in python.

The whole process had to look something like this..

Download the file from S3 -> Prepend the column header -> Upload the file back to S3

Let’s look at how to download a file in S3 with Boto3.

Downloading the File

As I mentioned, Boto3 has a very simple api, especially for Amazon S3. If you’re not familiar with S3, then just think of it as Amazon’s unlimited FTP service or Amazon’s dropbox. The folders are called buckets and “filenames” are keys. Let’s say you wanted to download a file in S3 to a local file using boto3, here’s a pretty simple approach from the docs using the Object class:

import boto3

s3 = boto3.resource('s3')
obj = s3.Object('mybucket', 'hello.txt').download_file('/tmp/hello.txt')

Notice there’s no authentication information? More on that here

As I mentioned before, these files are large. So I couldn’t just use download_file as obj would hit a MemoryError before the file could finish processing.

After a little bit of searching, I learned that calling .get() on an s3.Object() will retrieve the object information and metadata from S3. One of the keys in that dict is 'Body'. Which is the contents of the file. Calling .get()['Body'] doesn’t download the file immediately, though. What you get is something called a StreamingBody instance. It’s generator and it comes with a .read(num_of_bytes) method! So it’s really easy to chunk the downloads and control how many bytes you want at once. I’m going to pretend I have a file in S3. In the bucket ‘mybucket’, it’s named ‘hello.txt’ and it’s contents are this: hello.

import boto3

s3 = boto3.resource('s3')
# .get()['Body'] returns a generator!
fileobj = s3.Object('mybucket', 'hello.txt').get()['Body']
fileobj.read(1)  # h
fileobj.read(1)  # e
fileobj.read(1)  # l
fileobj.read(1)  # l
fileobj.read(1)  # o

With chunking the downloads, we can avoid memory errors on the download part completely. In my case, I didn’t want to store anything locally. I wanted the download, modifications, and the upload to happen all around the same time. You can definitely yield more than one byte at a time, by the way! (It was just easier to demonstrate)

Prepending the Column Headers

I love generators. I love them so much! They’re almost always an elegant solution. We know that we can get a generator object of the file we want from S3. Which immediately told me that I could manipulate my yieldings. Let’s create a higher level function that will act as our file stream.

def file_stream():
    fileobj = s3.Object('mybucket', 'hello.txt').get()['Body']
    for x in range(5):
        yield fileobj.read(1)

This function basically grabs the object from s3 and starts yielding chunks of it at a time. With a little modification this could be changed so that it would yield the headers first, then the file. But what if I just checked if the iteration was the first one, and yield the column headers + first chunk of the file if it was? Let’s

def file_stream():
    data_to_prepend = 'oh '
    fileobj = s3.Object('mybucket', 'hello.txt').get()['Body']
    for idx in range(5):
        chunk = fileobj.read(1)
        if idx == 1:
            yield headers + chunk
        else:
            yield chunk

The output of this would look like this: oh hello.

Uploading the File Back to S3

Amazon S3 offers an api for Multi-Part Uploads, which essentially let’s us upload a single file in multiple parts. Boto3 also supports this as well. That process is a little more complicated, but still not hard!

import boto3

s3 = boto3.resource('s3')
bucket_name = 'mybucket'


def file_stream():
    data_to_prepend = 'oh '
    fileobj = s3.Object('mybucket', 'hello.txt').get()['Body']
    for idx in range(5):
        chunk = fileobj.read(1)
        if idx == 1:
            yield headers + chunk
        else:
            yield chunk  # Store the new object with .temp appended to the name
            
            
temp_key = key_name + ".temp"
# we have to keep track of all of our parts
part_info_dict = {'Parts': []}
# start the multipart_upload process
multi_part_upload = s3.create_multipart_upload(Bucket=bucket_name, Key=temp_key)

# Part Indexes are required to start at 1
for part_index, line in enumerate(file_stream(), start=1):
    # store the return value from s3.upload_part for later
    part = s3.upload_part(
        Bucket=bucket_name,
        Key=temp_key,
        # PartNumber's need to be in order and unique
        PartNumber=part_index,
        # This 'UploadId' is part of the dict returned in multi_part_upload
        UploadId=multi_part_upload['UploadId'],
        # The chunk of the file we're streaming.
        Body=line,
    )

    # PartNumber and ETag are needed
    part_info_dict['Parts'].append({
        'PartNumber': part_index,
        # You can get this from the return of the uploaded part that we stored earlier
        'ETag': part['ETag']
    })

    # This what AWS needs to finish the multipart upload process
    completed_ctx = {
        'Bucket': bucket_name,
        'Key': temp_key,
        'UploadId': multi_part_upload['UploadId'],
        'MultipartUpload': part_info_dict
    }

# Complete the upload. This triggers Amazon S3 to rebuild the file for you.
# No need to manually unzip all of the parts ourselves!
s3.complete_multipart_upload(**completed_ctx)

fin

That process worked great and is still used daily in our app. As I mentioned, I’m certain there are many other ways to solve this problem and I’d love to hear what you think!

#blog/ideas