There are plenty of examples of creating Azure Stream Analytics Query jobs, but most of them use the wizard in the management portal.
I found this example, which has most of the info needed; but unfortunately, it authenticates by popping up a login form. Not ideal for a server application!
My next attempt was using a cert, but it turns out the new Azure Resource Management APIs will only work with Azure AD. I started working my way through this example:
var authContext = new AuthenticationContext("https://login.microsoftonline.com/{tenantId}/oauth2/token"); var clientCredential = new ClientCredential(clientId, appKey); var result = authContext.AcquireToken("https://management.core.windows.net/", clientCredential); var creds = new TokenCloudCredentials(subscriptionId, result.AccessToken); var client = new StreamAnalyticsManagementClient(creds);
I could acquire a token without any problem, but when I came to try an create an SA query job I got an auth error:
AuthorizationFailed: The client 'REDACTED' with object id 'REDACTED' does not have authorization to perform action 'Microsoft.StreamAnalytics/streamingjobs/write' over scope '/subscriptions/REDACTED/resourcegroups/REDACTED/providers/Microsoft.StreamAnalytics/streamingjobs/REDACTED'
With some help from SO, it became clear that my AD app didn’t have the necessary permissions. There currently isn’t a built-in role with sufficient authoritar; so we had to drop the hammer and use the “Contributor” role, that can do just about anything:
New-AzureRoleAssignment -ServicePrincipalName {azureADAppUri} -RoleDefinitionName Contributor -Scope /subscriptions/{tenantId}/resourcegroups/{resourceGroupId}/providers/Microsoft.StreamAnalytics/streamingjobs/*
With that in place, creating the query job is trivial:
var jobCreateParameters = new JobCreateOrUpdateParameters { Job = new Job { Name = streamAnalyticsJobName, Location = "North Europe", Properties = new JobProperties { EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Adjust, Sku = new Sku { Name = "Standard" } } } }; client.StreamingJobs.CreateOrUpdate(resourceGroupName, jobCreateParameters); Console.WriteLine("Created job"); var jobInputCreateParameters = new InputCreateOrUpdateParameters { Input = new Input { Name = inputName, Properties = new StreamInputProperties { Serialization = new JsonSerialization { Properties = new JsonSerializationProperties { Encoding = "UTF8" } }, DataSource = new EventHubStreamInputDataSource { Properties = new EventHubStreamInputDataSourceProperties { EventHubName = inputEventHubName, ConsumerGroupName = inputConsumerGroup, ServiceBusNamespace = ns, SharedAccessPolicyName = "listen", SharedAccessPolicyKey = inputEventHubListenKey } } } } }; client.Inputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobInputCreateParameters); Console.WriteLine("Created job input"); client.Inputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsInputName); Console.WriteLine("Tested job input"); var jobOutputCreateParameters = new OutputCreateOrUpdateParameters { Output = new Output { Name = streamAnalyticsOutputName, Properties = new OutputProperties { Serialization = new JsonSerialization { Properties = new JsonSerializationProperties { Encoding = "UTF8", Format = "LineSeparated" } }, DataSource = new EventHubOutputDataSource { Properties = new EventHubOutputDataSourceProperties { EventHubName = outputEventHubName, ServiceBusNamespace = ns, SharedAccessPolicyName = "send", SharedAccessPolicyKey = outputEventHubSendKey } } } } }; client.Outputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobOutputCreateParameters); Console.WriteLine("Created job output"); client.Outputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsOutputName); Console.WriteLine("Tested job output"); var transformationCreateParameters = new TransformationCreateOrUpdateParameters { Transformation = new Transformation { Name = streamAnalyticsTransformationName, Properties = new TransformationProperties { StreamingUnits = 1, Query = query } } }; client.Transformations.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, transformationCreateParameters); Console.WriteLine("Created transformation"); var jobStartParameters = new JobStartParameters { OutputStartMode = OutputStartMode.CustomTime, OutputStartTime = DateTime.UtcNow }; client.StreamingJobs.Start(resourceGroupName, streamAnalyticsJobName, jobStartParameters); Console.WriteLine("Started job"); Console.ReadLine();
The API is a little odd, in that it returns a Response object, but seems to throw if the request is not successful.
Reblogged this on Dinesh Ram Kali..