Microsoft Sentinel - Ingest Data to Log Analytics Workspace using Azure Function.

How to ingest logs to Log Analytics using API?

Introduction

This article is going to talk about how you can utilize Azure function and a python script to ingest log data to Log Analytics workspace. I have got this knowledge from various Microsoft learn documentation and blogs.

Reader Must have knowledge of following skills and concept:

1.    Azure Fundamentals.

2.    Azure Functions, how to deploy using VS Code.

3.    Azure Sentinel and Log Analytics, how it works.

4.    How API works.

5.    Basic Python programming knowledge.

Prerequisites

Before you begin, make sure that you have the following requirements in place:

MS Sentinel and Log Analytics

·        An Azure account with an active subscription. Create an account for free.

·        Microsoft Sentinel and Log Analytics should be deployed under you subscription.

·        You must have read and write permissions on the Microsoft Sentinel workspace.

·        You must have read and write permissions on Azure Functions to create a Function App.

·        You must have read permissions to shared keys for the workspace

·        Azure Log Analytics URL e.g. https://<yourworkspaceID>.ods.opinsights.azure.com/api/logs?api-version=2016-04-01

·        Azure Log Analytics Workspace ID

·        Azure Log Analytics Workspace Key

·        Custom Log Name i.e. the table where the log data will be written to 

Azure Function

         Storage account  to store Azure Function Persistence data, you may want to use existing account or create a new one.
         Select Consumption (Serverless) as your Plan type.

 

 

VS Code and Python

·        The Azure Functions Core Tools, version 4.0.4785 or a later version.

·        Python versions that are supported by Azure Functions. For more information, see How to install Python.

·        Visual Studio Code on one of the supported platforms.

·        The Python extension for Visual Studio Code.

·        The Azure Functions extension for Visual Studio Code, version 1.8.1 or later.

·        The Azurite V3 extension local storage emulator. While you can also use an actual Azure storage account, this article assumes you're using the Azurite emulator.

Important

Functions doesn't currently support Python function development on ARM64 devices. To develop Python functions on a Mac with an M1 chip, you must run in an emulated x86 environment. To learn more, see x86 emulation on ARM64.

 

Script Sections.

Below is the Link to the sample script.

https://github.com/ajinkyadg/MicrosoftSentinelConnectorSample

Do the imports.

Below are the imports you need to make for this script to run

'''

This function is will download logs from one pass and push it into LogAnalytics Table in Sentinel.

 

'''

import os

import datetime

import logging

import requests

import json

import datetime

import azure.functions as func

import base64

import hmac

import hashlib

import re

 

 

 

 

 

 

 

 

 

 

 

Setup the Global Variables

#Setup Global Variables

connection_string = os.environ['AzureWebJobsStorage']

customer_id = os.environ['WorkspaceID']

shared_key = os.environ['WorkspaceKey']

logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com'

log_type = 'Test_DataIngestion'

logAnalyticsUri = os.environ.get('logAnalyticsUri')

if ((logAnalyticsUri in (None, '') or str(logAnalyticsUri).isspace())):

    logAnalyticsUri = 'https://' + customer_id + '.ods.opinsights.azure.com'

 

pattern = r"https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$"

match = re.match(pattern,str(logAnalyticsUri))

if(not match):

    raise Exception("Invalid Log Analytics Uri.")

 

 

Generate Hash for Authorization.

 Hash to authorize with Log Analytics API

def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):

    x_headers = 'x-ms-date:' + date

    string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource

    bytes_to_hash = bytes(string_to_hash, encoding="utf-8")

    decoded_key = base64.b64decode(shared_key)

    encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()

    authorization = "SharedKey {}:{}".format(customer_id,encoded_hash)

    return authorization

 

 

Post Data to LogAnalytics

def post_data(body):

    method = 'POST'

    content_type = 'application/json'

    resource = '/api/logs'

    rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')

    content_length = len(body)

    signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource)

    uri = logAnalyticsUri + resource + "?api-version=2016-04-01"

    headers = {

        'content-type': content_type,

        'Authorization': signature,

        'Log-Type': log_type,

        'x-ms-date': rfc1123date

    }

    response = requests.post(uri,data=body, headers=headers)

    if (response.status_code >= 200 and response.status_code <= 299):

        return response.status_code

    else:

        logging.warn("Events are not processed into Azure. Response code: {}".format(response.status_code))

        return None

 

 

Main function

To setup variables and call the data ingestion function.

def main(req: func.HttpRequest) -> func.HttpResponse:

    logging.info('Python HTTP trigger function processed a request.')

    body = json.dumps({

            "uuid": "asdfasdfasdf",

            "timestamp": "2023-02-10T12:55:49.903Z",

            "used_version": 16,

            "vault_uuid": "asdfasdfasdf",

            "item_uuid": "asdfasdfasdfa",

            "user": {

                "uuid": "asdfasdfasdfasdfasd",

                "name": "asfa asdfasdf",

                "email": "asdf.asdfs@test.com"

            },

            "client": {

                "app_name": "1Password Browser Extension",

                "app_version": "20236",

                "platform_name": "Chrome",

                "platform_version": "109.0.0.0",

                "os_name": "MacOSX",

                "os_version": "10.15.7",

                "ip_address": "11.11.11.11"

            },

            "location": {

                "country": "Cattegat",

                "region": "Uppsala",

                "city": "Jormsburg",

                "latitude": 11.3591,

                "longitude": 11.9948

            },

            "action": "fill"

        })

    post_data(body)

    name = req.params.get('name')

    if not name:

        try:

            req_body = req.get_json()

        except ValueError:

            pass

        else:

            name = req_body.get('name')

 

    if name:

        return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")

    else:

        return func.HttpResponse(

             "This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",

             status_code=200

        )

 

 

References

How to develop Azure Function using VS Code?

https://learn.microsoft.com/en-us/azure/azure-functions/functions-develop-vs-code?tabs=python

How Azure Function works?

https://learn.microsoft.com/en-us/azure/azure-functions/functions-overview

What are the different ways to Create MS Sentinel Custom Connectors?

https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/azure-sentinel-creating-custom-connectors/ba-p/864060

How to build a script to Ingest data to MS Sentinel using Azure function?

https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/ingest-fastly-web-application-firewall-logs-into-azure-sentinel/ba-p/1238804

Where to find the list of supported custom connectors?

https://learn.microsoft.com/en-us/azure/sentinel/data-connectors-reference

 

Comments

Popular posts from this blog

Troubleshooting Windows remote Event/Log Collection.(IBM Wincollect, Logrhythm SMA)