Vai al contenuto

Data parsing and querying

Our XDR solution includes standard procedures and a specific language for executing queries aimed at analyzing the available information.
The various information can be organized into dashboards, each of which contains different display panels.

To create a new panel, simply go to a dashboard and click on "Add a new panel":
Alt text
The screen for creating a panel allows you to set up a search query to be run on a specific data source, that is, on a specific index or table in the data repository. Each panel must be associated with a visualization through which to display the data.

Below is the screen for creating a panel:
Alt text

The query language offered by our solution supports, among other things, the following functionalities: filtering to identify specific conditions, selecting a subset of fields to display, transforming certain fields returned by the query, calculation averages, counts, etc., the ability to sort results, and deduplication management. There is a set of predefined queries. Each query can be associated with a dashboard to enable continuous monitoring or to run it on-demand on data pertaining to a specific time interval.

Each query can be transformed into a BIOC, associating an alert with it. This allows monitoring and detecting new suspicious events or searching for that BIOC in existing data that triggers it.

Data parsing

There is a specific platform for managing files containing the configuration of parsing rules, essentially filters applied to raw data ingested by the data ingestion component. These filters allow the elimination of potentially unnecessary data, aiming to reduce storage costs. Additionally, they enable data enrichment with tags that can be used by the rest of the collection flow. For example, here is an excerpt from the configuration that adds tags if certain conditions are met:

if [process][name] =~ /^dhcpd$/ {

    mutate {

      add_tag => [ "dhcp", "dhcpdv4", "firewall" ]

      add_field => { "[event][dataset]" => "pfAzSentinel.dhcp" }

    }

    grok {

      patterns_dir => [ "/usr/share/logstash/patterns" ]

      match => [ "filter_message", "%{DHCPD}"]

    }

  }

The parsing filters also allow the addition of valuable information for threat detection, such as geolocation of an IP address. Below is the configuration snippet created to fulfill the aforementioned purpose:

if [destination][ip] {

      ### Check if destination.ip address is private

      cidr {

        address => [ "%{[destination][ip]}" ]

        network => [ "0.0.0.0/32", "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7", "127.0.0.0/8", "::1/128", "169.254.0.0/16", "fe80::/10", "224.0.0.0/4", "ff00::/8", "255.255.255.255/32", "::" ]

        add_tag => "IP_Private_Destination"

      }

      if "IP_Private_Destination" not in [tags] {

        geoip {

          source => "[destination][ip]"

#MMR#          database => "/var/lib/GeoIP/GeoLite2-City.mmdb"

          target => "[destination][geo]"

        }

        geoip {

          default_database_type => 'ASN'

#MMR#          database => "/var/lib/GeoIP/GeoLite2-ASN.mmdb"

          source => "[destination][ip]"

          target => "[destination][as]"

        }

        mutate {

          rename => { "[destination][as][asn]" => "[destination][as][number]"}

          rename => { "[destination][as][as_org]" => "[destination][as][organization][name]"}

          rename => { "[destination][geo][country_code2]" => "[destination][geo][country_iso_code]"}

          rename => { "[destination][geo][region_code]" => "[destination][geo][region_iso_code]"}

          add_tag => "GeoIP_Destination"

        }

      }

    }