How to ingest data with the REST API

This article describes how to ingest data without Kusto.Ingest library by using the REST API.

The Kusto.Ingest library is preferred for ingesting data to your database. However, you can still achieve almost the same functionality, without being dependent on the Kusto.Ingest package. This article shows you how, by using Queued Ingestion to your database for production-grade pipelines.

This article deals with the recommended mode of ingestion. For the Kusto.Ingest library, its corresponding entity is the IKustoQueuedIngestClient interface. Here, the client code interacts with your database by posting ingestion notification messages to an Azure queue. References to the messages are obtained from the Kusto Data Management (also known as the Ingestion) service. Interaction with the service must be authenticated with Microsoft Entra ID. This article deals with the recommended mode of ingestion. For the Kusto.Ingest library, its corresponding entity is the IKustoQueuedIngestClient interface. Here, the client code interacts with your database by posting ingestion notification messages to an Azure queue. References to the messages are obtained from the Kusto Data Management (also known as the Ingestion) service. Interaction with the service must be authenticated with Microsoft Entra ID.

The following code shows how the Kusto Data Management service handles queued data ingestion without using the Kusto.Ingest library. This example may be useful if full .NET is inaccessible or unavailable because of the environment, or other restrictions.

The code includes the steps to create an Azure Storage client and upload the data to a blob. Each step is described in greater detail, after the sample code.

  1. Obtain an authentication token for accessing the ingestion service
  2. Obtain an authentication token for accessing the ingestion service
  3. Query the ingestion service to obtain:
  4. Upload data to a blob on one of the blob containers obtained from Kusto in (2)
  5. Upload data to a blob on one of the blob containers obtained from Kusto in (2)
  6. Compose an ingestion message that identifies the target database and table and that points to the blob from (3)
  7. Compose an ingestion message that identifies the target database and table and that points to the blob from (3)
  8. Post the ingestion message we composed in (4) to an ingestion queue obtained in (2)
  9. Post the ingestion message we composed in (4) to an ingestion queue obtained in (2)
  10. Retrieve any error found by the service during ingestion
  11. Retrieve any error found by the service during ingestion
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your ingestion service URI
    var dmServiceBaseUri = @"{serviceURI}";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Using queued ingestion for production-grade pipelines

Obtain authentication evidence from Microsoft Entra ID

// Authenticates the interactive user and retrieves Microsoft Entra Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Microsoft Entra ID:
        .Build();
    // Acquire user token for the interactive user:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Retrieve ingestion resources

Manually construct an HTTP POST request to the Data Management service, requesting the return of the ingestion resources. These resources include queues that the DM service is listening on, and blob containers for data uploading. The Data Management service will process any messages containing ingestion requests that arrive on one of those queues.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Obtain a Kusto identity token

Ingest messages are handed off to your cluster via a non-direct channel (Azure queue), making it impossible to do in-band authorization validation for accessing the ingestion service. The solution is to attach an identity token to every ingest message. The token enables in-band authorization validation. This signed token can then be validated by the ingestion service when it receives the ingestion message.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Upload data to the Azure Blob container

This step is about uploading a local file to an Azure Blob that will be handed off for ingestion. This code uses the Azure Storage SDK. If dependency isn’t possible, it can be achieved with Azure Blob Service REST API. This step is about uploading a local file to an Azure Blob that will be handed off for ingestion. This code uses the Azure Storage SDK. If dependency isn’t possible, it can be achieved with Azure Blob Service REST API.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Compose the ingestion message

The NewtonSoft.JSON package will again compose a valid ingestion request to identify the target database and table, and that points to the blob. The message will be posted to the Azure Queue that the relevant Kusto Data Management service is listening on.

Here are some points to consider.

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Post the ingestion message to the ingestion queue

Finally, post the message that you constructed, to the selected ingestion queue that you previously obtained.

If you are using .Net storage client versions above v12, you must properly encode the message content.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Check for error messages from the Azure queue

After ingestion, we check for failure messages from the relevant queue that the Data Management writes to. For more information on the failure message structure, see Ingestion failure message structure. After ingestion, we check for failure messages from the relevant queue that the Data Management writes to. For more information on the failure message structure, see Ingestion failure message structure.

{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    return messages;
}

Ingestion messages - JSON document formats

Ingestion message internal structure

The message that the Kusto Data Management service expects to read from the input Azure Queue is a JSON document in the following format.

{
}
PropertyDescription
IdMessage identifier (GUID)
BlobPathPath (URI) to the blob, including the SAS key granting permissions to read/write/delete it. Permissions are required so that the ingestion service can delete the blob once it has completed ingesting the data.
RawDataSizeSize of the uncompressed data in bytes. Providing this value allows the ingestion service to optimize ingestion by potentially aggregating multiple blobs. This property is optional, but if not given, the service will access the blob just to retrieve the size.
DatabaseNameTarget database name
TableNameTarget table name
RetainBlobOnSuccessIf set to true, the blob won’t be deleted once ingestion is successfully completed. Default is false
FlushImmediatelyIf set to true, any aggregation will be skipped. Default is false
ReportLevelSuccess/Error reporting level: 0-Failures, 1-None, 2-All
ReportMethodReporting mechanism: 0-Queue, 1-Table
AdditionalPropertiesOther properties such as format, tags, and creationTime. For more information, see data ingestion properties.
AdditionalPropertiesOther properties such as format, tags, and creationTime. For more information, see data ingestion properties.

Ingestion failure message structure

The message that the Data Management expects to read from the input Azure Queue is a JSON document in the following format.

PropertyDescription
OperationIdOperation identifier (GUID) that can be used to track the operation on the service side
DatabaseTarget database name
TableTarget table name
FailedOnFailure timestamp
IngestionSourceIdGUID identifying the data chunk that failed to ingest
IngestionSourcePathPath (URI) to the data chunk that failed to ingest
DetailsFailure message
ErrorCodeThe error code. For all the error codes, see Ingestion error codes.
ErrorCodeThe error code. For all the error codes, see Ingestion error codes.
FailureStatusIndicates whether the failure is permanent or transient
RootActivityIdThe correlation identifier (GUID) that can be used to track the operation on the service side
OriginatesFromUpdatePolicyIndicates whether the failure was caused by an erroneous transactional update policy
OriginatesFromUpdatePolicyIndicates whether the failure was caused by an erroneous transactional update policy
ShouldRetryIndicates whether the ingestion could succeed if retried as is