Creating a Stream Analytics Query job programatically

There are plenty of examples of creating Azure Stream Analytics Query jobs, but most of them use the wizard in the management portal.

I found this example, which has most of the info needed; but unfortunately, it authenticates by popping up a login form. Not ideal for a server application!

My next attempt was using a cert, but it turns out the new Azure Resource Management APIs will only work with Azure AD. I started working my way through this example:

            var authContext = new AuthenticationContext("https://login.microsoftonline.com/{tenantId}/oauth2/token");
            var clientCredential = new ClientCredential(clientId, appKey);

            var result = authContext.AcquireToken("https://management.core.windows.net/", clientCredential);

            var creds = new TokenCloudCredentials(subscriptionId, result.AccessToken);
            var client = new StreamAnalyticsManagementClient(creds);

I could acquire a token without any problem, but when I came to try an create an SA query job I got an auth error:

AuthorizationFailed: The client 'REDACTED' with object id 'REDACTED' does not have authorization to perform action 'Microsoft.StreamAnalytics/streamingjobs/write' over scope '/subscriptions/REDACTED/resourcegroups/REDACTED/providers/Microsoft.StreamAnalytics/streamingjobs/REDACTED'

With some help from SO, it became clear that my AD app didn’t have the necessary permissions. There currently isn’t a built-in role with sufficient authoritar; so we had to drop the hammer and use the “Contributor” role, that can do just about anything:

New-AzureRoleAssignment -ServicePrincipalName {azureADAppUri} -RoleDefinitionName Contributor -Scope /subscriptions/{tenantId}/resourcegroups/{resourceGroupId}/providers/Microsoft.StreamAnalytics/streamingjobs/*

With that in place, creating the query job is trivial:

            var jobCreateParameters = new JobCreateOrUpdateParameters
            {
                Job = new Job
                {
                    Name = streamAnalyticsJobName,
                    Location = "North Europe",
                    Properties = new JobProperties
                    {
                        EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Adjust,
                        Sku = new Sku
                        {
                            Name = "Standard"
                        }
                    }
                }
            };

            client.StreamingJobs.CreateOrUpdate(resourceGroupName, jobCreateParameters);

            Console.WriteLine("Created job");

            var jobInputCreateParameters = new InputCreateOrUpdateParameters {
                Input = new Input {
                    Name = inputName,
                    Properties = new StreamInputProperties {
                        Serialization = new JsonSerialization
                        {
                            Properties = new JsonSerializationProperties
                            {
                                Encoding = "UTF8"
                            }
                        },
                        DataSource = new EventHubStreamInputDataSource
                        {
                            Properties = new EventHubStreamInputDataSourceProperties
                            {
                                EventHubName = inputEventHubName,
                                ConsumerGroupName = inputConsumerGroup,
                                ServiceBusNamespace = ns,
                                SharedAccessPolicyName = "listen",
                                SharedAccessPolicyKey = inputEventHubListenKey
                            }
                        }
                    }
                }
            };

            client.Inputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobInputCreateParameters);

            Console.WriteLine("Created job input");


            client.Inputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsInputName);

            Console.WriteLine("Tested job input");

            var jobOutputCreateParameters = new OutputCreateOrUpdateParameters {
                Output = new Output {
                    Name = streamAnalyticsOutputName,
                    Properties = new OutputProperties {
                        Serialization = new JsonSerialization
                        {
                            Properties = new JsonSerializationProperties
                            {
                                Encoding = "UTF8",
                                Format = "LineSeparated"
                            }
                        },
                        DataSource = new EventHubOutputDataSource {
                            Properties = new EventHubOutputDataSourceProperties {
                                EventHubName = outputEventHubName,
                                ServiceBusNamespace = ns,
                                SharedAccessPolicyName = "send",
                                SharedAccessPolicyKey = outputEventHubSendKey
                            }
                        }
                    }
                }
            };

            client.Outputs.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, jobOutputCreateParameters);

            Console.WriteLine("Created job output");

            client.Outputs.TestConnection(resourceGroupName, streamAnalyticsJobName, streamAnalyticsOutputName);

            Console.WriteLine("Tested job output");

            var transformationCreateParameters = new TransformationCreateOrUpdateParameters {
                Transformation = new Transformation {
                    Name = streamAnalyticsTransformationName,
                    Properties = new TransformationProperties {
                        StreamingUnits = 1,
                        Query = query
                    }
                }
            };

            client.Transformations.CreateOrUpdate(resourceGroupName, streamAnalyticsJobName, transformationCreateParameters);

            Console.WriteLine("Created transformation");

            var jobStartParameters = new JobStartParameters
            {
                OutputStartMode = OutputStartMode.CustomTime,
                OutputStartTime = DateTime.UtcNow
            };

            client.StreamingJobs.Start(resourceGroupName, streamAnalyticsJobName, jobStartParameters);

            Console.WriteLine("Started job");

            Console.ReadLine();

The API is a little odd, in that it returns a Response object, but seems to throw if the request is not successful.

Advertisements

2 thoughts on “Creating a Stream Analytics Query job programatically

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s