I have a really weird behaviour on Amazon Athena:
Create a test table with:
CREATE EXTERNAL TABLE IF NOT EXISTS athena_test_table( id int, c1 string, c2 string, c3 string)
LOCATION 's3://s3-path/athena/'
Insert some data:
insert into "athena_test_table"
values (1,'val1a','val1b','val1c'), (2,'val2a','val2b','val2c'), (3,'val3a','val3b','val3c')
A Select
returns this obvious result:
# id c1 c2 c3
1 1 val1a val1b val1c
2 2 val2a val2b val2c
3 3 val3a val3b val3c
If I try to query this data with a C# application (AWSSDK.Core 3.7.106.2 and AWSSDK.Athena 3.7.104.9) or just try to get all tables from the Database with SHOW TABLES IN TESTDB
some junk or empty data is added to my athena_test_table:
# id c1 c2 c3
1
2
3 1 val1a val1b val1c
4 2 val2a val2b val2c
5 3 val3a val3b val3c
or
# id c1 c2 c3
1
2
3
4
5
6
7
8
9 1 val1a val1b val1c
10 2 val2a val2b val2c
11 3 val3a val3b val3c
12
13
14
15
16 "#
17 "#
18
and the table will grow if I send the next request.
C# code to retrieve data from Amazon Athena:
private async Task<ResultSet> SendAndResponseToAthena(string query, AthenaParameters data, ProxyData proxyData)
{
try
{
Uri.TryCreate(proxyData.Url, UriKind.Absolute, out var proxyUrl);
var clientConfig = new AmazonAthenaConfig
{
ProxyHost = proxyUrl?.Host,
ProxyPort = proxyUrl?.Port ?? 0,
ProxyCredentials = new NetworkCredential(proxyData.Username, proxyData.Password),
RegionEndpoint = RegionEndpoint.GetBySystemName(data.Region)
};
var client = new AmazonAthenaClient(data.UserName, data.Password, clientConfig);
var executionId = await SubmitAthenaQuery(query, client, data);
await WaitForQueryToComplete(client, executionId);
var resultSet = await ProcessResult(client, executionId);
return resultSet;
}
catch (Exception ex)
{
LogException(ex.Message, ex, this);
throw new Exception(ex.ToString());
}
}
private async Task<string> SubmitAthenaQuery(string query, AmazonAthenaClient client, AthenaParameters data)
{
var executionRequest = new StartQueryExecutionRequest
{
QueryString = query,
ResultConfiguration = new ResultConfiguration { OutputLocation = data.S3Location },
QueryExecutionContext = new QueryExecutionContext { Database = data.DatabaseName }
};
try
{
var response = await client.StartQueryExecutionAsync(executionRequest);
var executionId = response.QueryExecutionId;
LogInfo($"Submit Athena Query: {query} with Execution Id: {executionId}.", this);
return executionId;
}
catch (Exception ex)
{
LogException(ex.Message, ex, this);
throw new Exception(ex.ToString());
}
}
private async Task WaitForQueryToComplete(AmazonAthenaClient client, string executionId)
{
var getQueryExecutionRequest = new GetQueryExecutionRequest { QueryExecutionId = executionId };
var isQueryStillRunning = true;
while (isQueryStillRunning)
{
var getQueryExecutionResponse = await client.GetQueryExecutionAsync(getQueryExecutionRequest);
var queryState = getQueryExecutionResponse.QueryExecution.Status.State.ToString();
var changeReason = getQueryExecutionResponse.QueryExecution.Status.StateChangeReason;
if (queryState.Equals(QueryExecutionState.FAILED.ToString()))
{
throw new Exception($"The Amazon Athena query failed to run with error message: {changeReason}");
}
if (queryState.Equals(QueryExecutionState.CANCELLED.ToString()))
{
throw new Exception($"The Amazon Athena query was cancelled with change reason: {changeReason}");
}
if (queryState.Equals(QueryExecutionState.SUCCEEDED.ToString()))
{
isQueryStillRunning = false;
}
else
{
// sleep an amount of time (500ms) before retrying again
await Task.Delay(500);
}
}
}
private async Task<ResultSet> ProcessResult(AmazonAthenaClient client, string executionId)
{
var queryResultsRequest = new GetQueryResultsRequest { QueryExecutionId = executionId };
try
{
LogInfo($"Retrieve the results for Execution Id: {executionId}.", this);
var results = await client.GetQueryResultsAsync(queryResultsRequest);
var pageResult = results;
while (pageResult.NextToken != null) // process pagination (>1000 entries)
{
var request = new GetQueryResultsRequest { QueryExecutionId = executionId, NextToken = pageResult.NextToken };
pageResult = await client.GetQueryResultsAsync(request);
results.ResultSet.Rows.AddRange(pageResult.ResultSet.Rows);
}
return results.ResultSet;
}
catch (Exception ex)
{
LogException(ex.Message, ex, this);
throw new Exception(ex.ToString());
}
}
Any ideas what could describe this behaviour and how to solve it?
From Working with query results, recent queries, and output files - Amazon Athena:
Amazon Athena automatically stores query results and metadata information for each query that runs in a query result location that you can specify in Amazon S3. If necessary, you can access the files in this location to work with them. You can also download query result files directly from the Athena console.
It would appear that you have configured your output location (a directory) to point to the same location as your source data. Therefore, every time that a query runs in Amazon Athena, a new output file is created in that location.
When Amazon Athena runs a query, it looks in every file in location
specified for the table, including subdirectories. Therefore, these output files are being included as source data for the queries.
Solution: Change the output location to somewhere different. It is being defined here:
ResultConfiguration = new ResultConfiguration { OutputLocation = data.S3Location },