350000 rows, 133 cols... From a huge CSV to DynamoDB (without breaking piggy-bank).
In this post I will explain how to:
- Parse a CSV file and extract only certain columns
- Create a table in DynamoDB
- Insert all the data with an adaptive algorithm in order to use the provisioned capacity
- Reduce the capacity once the insertion is done.
Exploring the problem: AWS Billing
In a previous post I explained how I was using dynamodb to store a lot of data about aws billing.
On top of the API that deals with products and offers, AWS can provide a “billing report”. Those reports are delivered to am Amazon S3 bucket in CSV format at least once a day.
The rows of the CSV are organized in topics as described here.
Each line of the CSV represents an item that is billed. But every resource is made of several billable items. For example on EC2, you pay the “compute”, the bandwidth, the volume etc…
I would like to use and understand this file to optimize the costs. A kind of BI.
AWS says that you can import your file in Excel (or alike)… That could be a solution but:
On top of that with a lot of resources the file is fat (more thant a 100000 lines several times a day for my client). I have decided to use dynamodb to store all the information so it will be easy to perform an extract and generate a dashboard. In this post, I will expose some go techniques I have used to achive that.
Step 1: Parsing the CSV
As I explained, the CSV file is made of more than a hundreds cols. The columns are identified in the first row of the CSV. I will store each row in a go struct.
To parse it easily, I an using custom fields csv
in the struct. The field value corresponds to the header name for the seek value.
For example:
1type test struct {
2 ID string `csv:"myid"`
3 Name string `csv:"prenom"`
4 Last string `csv:"nom"`
5 Test string `csv:"nonexitent"`
6}
Then, I am reading the first row of the CSV file and then ranging the field names of the struct to fill a map with the field key as key and the col number as value. I set ‘-1’ if the field is not found:
1var headers = make(map[string]int, et.NumField())
2for i := 0; i < et.NumField(); i++ {
3 headers[et.Field(i).Name] = func(element string, array []string) int {
4 for k, v := range array {
5 if v == element {
6 return k
7 }
8 }
9 return -1
10 }(et.Field(i).Tag.Get("csv"), header)
11}
Then I can parse the CSV file and fill a channel with one object by row… See the full example here
Step 2: Creating the table in DynamoDB
This step is “easy”. I will create a table with one index and a sort key.
For the example the index is a string named Key
. The sort key is also a string named SortKey
.
1AttributeDefinitions: []*dynamodb.AttributeDefinition{
2 {
3 AttributeName: aws.String("Key"),
4 AttributeType: aws.String("S"),
5 },
6 {
7 AttributeName: aws.String("SortKey"),
8 AttributeType: aws.String("S"),
9 },
10},
11KeySchema: []*dynamodb.KeySchemaElement{
12 {
13 AttributeName: aws.String("Key"),
14 KeyType: aws.String("HASH"),
15 },
16 {
17 AttributeName: aws.String("SortKey"),
18 KeyType: aws.String("RANGE"),
19 },
20},
I will set an initial provisioning of 600. This would cost a lot of money but I will reduce it later to spare. The high provisioning rate is needed otherwise it would take me hours to integrate the CSV.
1ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
2 ReadCapacityUnits: aws.Int64(5),
3 WriteCapacityUnits: aws.Int64(300),
4},
The code for creating the table is here
Step 3: Inserting the data
The structure is read through the channel I have created previously.
The object is encoded to a dynamodb compatible one thanks the marshal
function of this helper library github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute
To make the structure ID match the Key
attribute of the table, I am using the dynamodbav
fields.
1type test struct {
2 ID string `csv:"myid" dynamodbav:"Key"`
3 Name string `csv:"prenom" dynamodbav:"SortKey"`
4 Last string `csv:"nom" dynamodbav:"Last,omitempty"`
5 Test string `csv:"nonexitent"`
6}
7...
8for v := range c {
9 item, err := dynamodbattribute.MarshalMap(v)
10 params := &dynamodb.PutItemInput{
11 Item: item,
12 TableName: aws.String(tableName),
13 }
14 svc.PutItem(params)
15}
Going concurrent
I will add a touch of concurrency. I will use a maximum of 20 goroutines simultaneously to send items to the dynamodb. This is an empiric decision.
I am using a “guard” channel. This channel has a buffer of 20. The buffed is filled with am empty struct whenever an item is received in the main communication channel. I am then launching a gorouting that will insert the event into dynamodb and consume one event from the guard channel when done.
The guard channel is blocking when it is full. Therefore I am sure that 20 goroutines will run at maximum:
1guard := make(chan struct{}, 20)
2for v := range c {
3 guard <- struct{}{}
4 go func(v *test) {
5 item, err := dynamodbattribute.MarshalMap(v)
6 params := &dynamodb.PutItemInput{
7 Item: item,
8 TableName: aws.String(tableName),
9 }
10 svc.PutItem(params)
11 <-guard
12 }
13}
Using a backoff algorithm
The problem with this implementation is that it can overload the capacity.
Therefore the rejected event must be resent. Of course I can simply check for the error dynamodb.ErrCodeProvisionedThroughputExceededException
an immediately resend the failed event.
But this may lead to dramatic performances. The AWS documentation point an Exponential Backoff algorithm as an advice to optimize the writing: Cf AWS documentation)
Wikipedia gives a good explanation of the exponential backoff but to make it simple the idea is to decrease the ration of insertion of the DB in order to get a good performance.
I am using a go implenenation found on github made by Cenkalti.
I return the error only in case of dynamodb.ErrCodeProvisionedThroughputExceededException
by now:
1backoff.Retry(func() error {
2 // Now put the item, discarding the result
3 _ , err = svcDB.PutItem(params)
4 if err != nil {
5 if err.(awserr.Error).Code() == dynamodb.ErrCodeProvisionedThroughputExceededException {
6 return err
7 }
8 // TODO: Special case...
9 log.Printf("Error inserting %v (%v)", v, err)
10 }
11 // Do the insert here
12 return nil
13}, backoff.NewExponentialBackOff())
Step 4: Updating the table and reducing the write capacity
Once the insert is done, to avoid a huge bill, I am reducing the Provisioned capacity of the table.
This is done with an update
request:
1params := &dynamodb.UpdateTableInput{
2 TableName: aws.String(tableName), // Required
3 ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
4 ReadCapacityUnits: aws.Int64(10), // Required
5 WriteCapacityUnits: aws.Int64(1), // Required
6 },
7}
8resp, err := svc.UpdateTable(params)
Conclusion: it works
It took me half an hour to process and insert 350000 lines (with 133 cols each) into the dynamodb from my laptop.
I can see that the adaptative algorithm works on the graphs:
Now I can analyse the data to find a proper way to optimize the aws bill for my client.
The full example is on gist