FortiGate Traffic Log

FortiGate Traffic log (downloaded as text) is an example of a log consisting of one type of records with variable number of unordered key-value pair fields, separated by one or more spaces.

Example:

date=2019-03-31 time=06:42:54 eventtime=1554039772 srcip=172.16.200.55 app="Web Mgmt" utmref=65162-7772

Hint

You can find sample log file by navigating with Input Data Browser to s3s://spectx-docs/formats/log/fortigate/FortiGate-6.2.0.traffic.log

Parse

As the number of key-value pair fields and their order vary between the records it can not be matched by a static pattern. Instead, we use KVP which parses key-value pairs as they appear in runtime. We need to supply the pattern describing the key, value, and separators between key and value and between the pairs. KVP applies this pattern until the unmatch occurs or when the specified maximum number of matches has occurred. Extracted key-value pairs are captured in the VARIANT_OBJECT.

Here’s the pattern:

1
2
3
4
5
6
KVP{
 LD:key '='
 (DQS:valueDqstr | IPADDR:valueIp | LONG:valueInt | LD:valueStr)
 (' '+ | >>EOL)
}:attr
EOL

where line:

  1. Extracts the key as a string using LD (matching all characters until value separator =).

  2. The values may appear as a double-quoted string, IP-address, number or an unquoted string (containing no space). Hence we use alternatives group to match them accordingly.

    Note

    the order of choices in the alternatives group matter! We must place the LD capturing unquoted string the last since alternatives group uses lazy matching - i.e it considers the first match successful (as opposed to the best match).

  3. A key-value pair is separated by space, except for the last key-value pair which is followed immediately by a line break. Hence we need to match either space or the line feed character. But since the latter also terminates the record we need to use look ahead to leave it for consuming by EOL at line 6. At this point, the last key-value pair is successfully extracted and the parser pointer is still pointing at the line break. At the next round, KVP gets unmatch and that signals it to exit. Parsing engine proceeds with the next defined matcher in the pattern in line 6.

    Note

    it appears that some records contain more than one space separating the key-value pairs, hence we use quantifier + to allow matching for one or more space characters.

  4. specify the export name for KVP output structure

  5. Our record is terminated by line feed character.

Parsing the example row above with the pattern will result in:

attr
{“date”:2019-03-31,”time”:06:42:54,”eventtime”:1554039772,”srcip”:172.16.200.55,”app”:Web Mgmt,”utmref”:65162-7772}

Normalizing Query

Although we have successfully parsed out all the fields in records you may notice that the date and time fields are represented as strings, also eventtime is a number. We need to convert them to TIMESTAMP type before to allow using date-time functions.

Also, we can see that certain set of fields are common to all records, it would be convenient to have them as resultset fields converted to their intended data types.

We can do all of this by creating a simple query. When we save it in the resource tree it becomes a view, which can be used to select the data from.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
LIST(src:'s3s://spectx-docs/formats/log/fortigate/FortiGate-6.2.0.traffic.log')
| parse(pattern:FETCH('https://raw.githubusercontent.com/spectx/resources/master/examples/patterns/fortigate/fg_kvp.sxp'))
| select(
         timestamp:    PARSE("TIMESTAMP(tz='UTC'):timestamp", attr[date] || ' ' || attr[time]),
       log_id:   INT(attr[logid]),
       type:    STRING(attr[type]),
       sub_type: STRING(attr[subtype]),
       level:       STRING(attr[level]),
       event_time:TIMESTAMP(INT(attr[eventtime])),
       vd:      STRING(attr[vd]),
       attr)
;

where line:

  1. Performs listing source data file
  2. Retrieves the content source data file and parses it according to the pattern specified.
  3. Select statement for creating resultset for normalized fields:
  4. Creates a TIMESTAMP type field by concatenating fields attr[date] and attr[time] and parsing resulting string using TIMESTAMP matcher to UTC timezone.
  5. Creates a field named log_id by converting attr[logid] to INTEGER type. It allows us to choose records using numerical comparisons.
  6. Lines 6,7,8 and 10 create STRING type fields from respective attr member fields
  1. Creates a TIMESTAMP type field from attr[eventtime] Unix epoch value.
  1. Include attr in the resultset.

Here’s first 5 records of the resultset when executing this query:

timestamp log_id type sub_type level event_time vd attr
2019-05-10 11:37:47.000 +0000 13 traffic forward notice 2019-05-10 18:37:47.000 +0000 vdom1 {“date”:20 …
2019-05-10 11:50:48.000 +0000 1000014 traffic local notice 2019-05-10 18:50:48.000 +0000 vdom1 {“date”:20 …
2019-03-31 06:42:54.000 +0000 2000012 traffic multicast notice 2019-03-31 13:42:52.000 +0000 vdom1 {“date”:20 …
2019-05-10 14:18:54.000 +0000 4000017 traffic sniffer notice 2019-05-10 21:18:54.000 +0000 root {“date”:20 …
2019-05-13 11:20:54.000 +0000 100032001 event system information 2019-05-13 18:20:54.000 +0000 vdom1 {“date”:20 …

Hint

You can download full code of the pattern and view query at https://github.com/spectx/resources/tree/master/examples/patterns/fortigate