There are plenty of examples of creating Azure Stream Analytics Query jobs, but most of them use the wizard in the management portal.
I found this example, which has most of the info needed; but unfortunately, it authenticates by popping up a login form. Not ideal for a server application!
My next attempt was using a cert, but it turns out the new Azure Resource Management APIs will only work with Azure AD. I started working my way through this example:
var authContext = new AuthenticationContext("https://login.microsoftonline.com/{tenantId}/oauth2/token");
var clientCredential = new ClientCredential(clientId, appKey);
var result = authContext.AcquireToken("https://management.core.windows.net/", clientCredential);
var creds = new TokenCloudCredentials(subscriptionId, result.AccessToken);
var client = new StreamAnalyticsManagementClient(creds);
I could acquire a token without any problem, but when I came to try an create an SA query job I got an auth error:
AuthorizationFailed: The client 'REDACTED' with object id 'REDACTED' does not have authorization to perform action 'Microsoft.StreamAnalytics/streamingjobs/write' over scope '/subscriptions/REDACTED/resourcegroups/REDACTED/providers/Microsoft.StreamAnalytics/streamingjobs/REDACTED'
With some help from SO, it became clear that my AD app didn’t have the necessary permissions. There currently isn’t a built-in role with sufficient authoritar; so we had to drop the hammer and use the “Contributor” role, that can do just about anything:
New-AzureRoleAssignment -ServicePrincipalName {azureADAppUri} -RoleDefinitionName Contributor -Scope /subscriptions/{tenantId}/resourcegroups/{resourceGroupId}/providers/Microsoft.StreamAnalytics/streamingjobs/*
With that in place, creating the query job is trivial:
var jobCreateParameters = new JobCreateOrUpdateParameters
{
Job = new Job
{
Name = streamAnalyticsJobName,
Location = "North Europe",
Properties = new JobProperties
{
EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Adjust,
Sku = new Sku
{
Name = "Standard"
}
}
}
};
client.StreamingJobs.CreateOrUpdate(resourceGroupName, jobCreateParameters);
Console.WriteLine("Created job");
var jobInputCreateParameters = new InputCreateOrUpdateParameters {
Input = new Input {
Name = inputName,
Properties = new StreamInputProperties {
Serialization = new JsonSerialization
{
Properties = new JsonSerializationProperties
{
Encoding = "UTF8"
}
},
DataSource = new EventHubStreamInputDataSource
{
Properties = new EventHubStreamInputDataSourceProperties
{
EventHubName = inputEventHubName,
ConsumerGroupName = inputConsumerGroup,
ServiceBusNamespace = ns,
SharedAccessPolicyName = "listen",
SharedAccessPolicyKey = inputEventHubListenKey
}
}
}
}
};
client.Inputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobInputCreateParameters);
Console.WriteLine("Created job input");
client.Inputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsInputName);
Console.WriteLine("Tested job input");
var jobOutputCreateParameters = new OutputCreateOrUpdateParameters {
Output = new Output {
Name = streamAnalyticsOutputName,
Properties = new OutputProperties {
Serialization = new JsonSerialization
{
Properties = new JsonSerializationProperties
{
Encoding = "UTF8",
Format = "LineSeparated"
}
},
DataSource = new EventHubOutputDataSource {
Properties = new EventHubOutputDataSourceProperties {
EventHubName = outputEventHubName,
ServiceBusNamespace = ns,
SharedAccessPolicyName = "send",
SharedAccessPolicyKey = outputEventHubSendKey
}
}
}
}
};
client.Outputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobOutputCreateParameters);
Console.WriteLine("Created job output");
client.Outputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsOutputName);
Console.WriteLine("Tested job output");
var transformationCreateParameters = new TransformationCreateOrUpdateParameters {
Transformation = new Transformation {
Name = streamAnalyticsTransformationName,
Properties = new TransformationProperties {
StreamingUnits = 1,
Query = query
}
}
};
client.Transformations.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, transformationCreateParameters);
Console.WriteLine("Created transformation");
var jobStartParameters = new JobStartParameters
{
OutputStartMode = OutputStartMode.CustomTime,
OutputStartTime = DateTime.UtcNow
};
client.StreamingJobs.Start(resourceGroupName, streamAnalyticsJobName, jobStartParameters);
Console.WriteLine("Started job");
Console.ReadLine();
The API is a little odd, in that it returns a Response object, but seems to throw if the request is not successful.