Elastic stack demonstrated itself as a leader for open source big data analysis, data collection, and visualization products. The stack which is usually abbreviated with ELK contains the following components
- Elastic search - data indexing and analysis service
- Logstash - service for pre-processing data before delivering it to elastic
- Kibana - web UI for analyzing the data. Recently Elastic added SIEM type of features to the Kibana dashboards and added support for Endgame EDR service.
- Beat add-ons - ELK has a lot of addons or agents for data collection, such as filebeat, winlogbeat and others
Because Logstash contains a lot of different plugins and add-ons for processing the data it is possible to use it with InTrust as well. There is also a way to use filtered windows events in the SIEM dashboards once the index with InTrust events is added to the corresponding workspace.
Here is what need to be done to configure Logstash, Elastic and Kibana to parse events collected and produced by InTrust
- Deploy Elastic Search, Kibana, and Logstash.
- Install and deploy winlogbeat and its Kibana components (you might need to specify the Kibana server in the configuration). This is necessary to get winlogbeat index template that we are going to re-use for InTrust data
- Go to Management > Index Management > Index Template, click on the Actions button next to the winlogbeat-x.x.x template and clone it into the intrust template. Specify "intrust-*" as the index pattern. In the mapping section, go to advanced options and specify "intrust" as a "beat" metadata, a version can be set to the corresponding InTrust version (11.4.1 in my case). Finish the index template creation process
- Go to Advanced Settings and navigate to the SIEM section. Add "intrust-*" index template to the SIEM indexes
- Create also "intrust-*" index pattern in the Management > Kibana > Index Patterns to be able to search through all InTrust data using basic Kibana discover capabilities
Now we need to send data to the ELK stack from InTrust, for this we need to configure ELK to accept and pre-parse the data, by default Logstash will use any *.conf file created in the "/etc/logstash/conf.d/" folder, so you can create an intrust.conf file with the following contents:
input { ################ Enable UDP section if necessary ########################## # udp { # port => 55514 # type => "rfc5424" # codec => multiline { # pattern => "^<%{NONNEGINT:syslog_pri}>" # negate => true # what => "previous" # } # } ########################################################################### tcp { port => 55514 type => "rfc5424" codec => multiline { pattern => "^<%{NONNEGINT:syslog_pri}>" negate => true what => "previous" } } } filter { if [type] == "rfc5424" { grok { # # Regexp to capture RFC5424 parts of InTrust message accurately (with newline characters) # match => { "message" => "(?m)<%{NONNEGINT:syslog_pri}>%{NONNEGINT:version}%{SPACE}(?:-|%{TIMESTAMP_ISO8601:syslog_timestamp})%{SPACE}(?:-|%{IPORHOST:hostname})%{SPACE}(?:%{SYSLOG5424PRINTASCII:program}|-)%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:process_id})%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:message_id})%{SPACE}(?:-|(?<structured_data>(\[.*?[^\\]\])+))(?:%{SPACE}(?m)(?<syslog_message>(.|\r|\n)*)|%{GREEDYDATA:syslog_other})" } add_tag => [ "match" ] } if "match" in [tags] { syslog_pri { remove_field => "syslog_pri" } date { match => [ "syslog_timestamp", "ISO8601", "MMM dd HH:mm:ss", "MMM dd HH:mm:ss.SSS" ] remove_field => "syslog_timestamp" } if [structured_data] { ruby { code => ' # #Ruby code to parse key=value paramters from RFC5424 InTrust message accurately (some may include new line characters) # def extract_syslog5424_sd(syslog5424_sd) sd = {} syslog5424_sd.scan(/\[(?<element>(.|\r|\n|\t)*?)\]/) do |element| data = element[0].match(/(?<sd_id>[^\ ]+)(?<sd_params> (.|\r|\n)*)?/) sd_id = data[:sd_id].split("@", 2)[0] sd[sd_id] = {} next if data.nil? || data[:sd_params].nil? data[:sd_params].scan(/ (.*?[=](?:""|"(.|\r|\n|\t)*?"))/) do |set| set = set[0].match(/(?<param_name>.*?)[=]\"(?<param_value>(.|\r|\n|\t)*)\"/) sd[sd_id][set[:param_name]] = set[:param_value] end end sd end event.set("[sd]", extract_syslog5424_sd(event.get("[structured_data]"))) ' remove_field => "structured_data" } } # # Converting InTrust parsing into Elastic common fields, specifically focused on winlogbeat index structure # mutate{ rename => {"host" => "syslog.host.ip"} rename => {"message" => "message_raw"} rename => {"syslog_message" => "message"} add_field => { "event.category" => "%{[sd][InTrust.Predefined][Category]}" "event.action" => "%{[sd][InTrust.Predefined][Category]}" "event.module" => "%{[sd][InTrust.Predefined][GatheringEventLog]}" "event.provider" => "%{[sd][InTrust.Predefined][SourceName]}" "winlog.event_id" => "%{[sd][InTrust.Predefined][EventID]}" "winlog.computer_name" => "%{[sd][InTrust.Predefined][ComputerName]}" "winlog.record_id" => "%{[sd][InTrust.Predefined][RecordKey]}" "host.os.version" => "%{[sd][InTrust.Predefined][VersionMajor]}.%{[sd][InTrust.Predefined][VersionMinor]}" } replace => {"host.name" => "%{[sd][InTrust.Predefined][ComputerName]}"} } if [sd][InTrust.Named][Result] { mutate{ add_field => { "event.outcome" => "%{[sd][InTrust.Named][Result]}" } } } if [sd][InTrust.Named][Result] { mutate{ add_field => {"user.name" => "%{[sd][InTrust.Named][Who]}"} } } if [sd][InTrust.Named][Security_ID] { mutate{ add_field => {"user.id" => "%{[sd][InTrust.Named][Security_ID]}"} } } if [sd][InTrust.Named][WhoDomain] { mutate{ add_field => { "user.domain" => "%{[sd][InTrust.Named][WhoDomain]}" "winlog.user.domain" => "%{[sd][InTrust.Named][WhoDomain]}" } } } if [sd][InTrust.Named][Who] { mutate{ add_field => {"winlog.user.name" => "%{[sd][InTrust.Named][Who]}"} } } # # Normalizing InTrust Security Alerts for Elastic SIEM # if [event.action] == "Rule Match" { mutate{ add_field => {"event.kind" => "alert"} replace => {"event.action" => "Security Alert"} replace => {"event.category" => "%{[sd][InTrust.Named][What]}"} } }else{ mutate{ add_field => {"event.kind" => "event"}} } mutate{lowercase => ["event.module"]} } } } output { elasticsearch { hosts => ["localhost:9200"] index => "intrust-%{+YYYY.MM.dd}" #The corresponding winlogbeat indexing template should be deployed for intrust-* index, also intrust-* index should be added to SIEM indexes } ## Uncomment for debug purposes #stdout { codec => rubydebug } }
This configuration file creates a listener on 55514 TCP port for Syslog data formatted in the RFC5424 format. Then there is a parsing section for RFC items and some transformation steps to optimize InTrust data for ELK usage, including for example conversion of InTrust alerts into the ELK SIEM alerts.
On the InTrust side, all we need to do is to specify forwarding settings on the desired repository. Use TCP connection type and Syslog RFC 5424 message format, specify filters if necessary (in case of a big event flow it is highly recommended not to send everything into your SIEM system, for alerts to work correctly, make sure you selected InTrust Alerts in the forwarding filters)
When you see messages being forwarded to the destination, check the Kibana side. If everything works correctly, you should see something like this:
Besides this nice data aggregation, filtering, and injection using InTrust with ELK provide the following capabilities:
- Highly effective compliance repository with up to 40:1 compression ration (20:1 with Indexing - default compression)
- Automatic deployment of the InTrust agent in windows and cloud(using Azure Pack) infrastructures
- Response Actions - the ability to automatically respond to events and event combinations with Alerts and scripted response actions
- Built-in security alerts - suspicious process, suspicious PowerShell, privileged logins, ransomware protection for file servers (using Change Auditor) and others
- Full-text search across all data using information about user and computer accounts from AD (using Enterprise Reporter and IT Security Search)
- Native windows events normalization to Who, What, Where, When, Whom and Where From
- Automatic intellectual Syslog parsing (InTrust automatically detect key-value pairs in the incoming Syslog)
- SQL Server data import which provides a way to analyze data in SQL, SSRS, and PowerBI
Read this white paper to learn how to dramatically improve your SIEM’s ROI and threat-hunting potential by adopting a better event log management model. Paired with Quest InTrust you can collect more data, archive that data cost-effectively for 10 years or more, improve threat detection and slash costs by feeding your SIEM a lower volume of higher quality data.