Normalising LDAP Log Records

User activity related records in LDAP logs have seemingly logical structure, consisting of records representing events bound by conn= to a session:

Dec 31 16:53:53 server1 slapd[1010]: conn=7448 fd=43 connection from IP=192.168.4.36:40629 (IP=:: 389) accepted.
Dec 31 16:53:53 server1 slapd[1010]: conn=7448 op=0 BIND dn="uid=user1,ou=people,dc=example,dc=com" method=128
Dec 31 16:53:53 server1 slapd[1010]: conn=7448 op=0 RESULT tag=97 err=0 text=
Dec 31 16:53:53 server1 slapd[1010]: conn=7448 op=1 SRCH base="ou=people,dc=example,dc=com" scope=2 filter="(objectClass=*)"
Dec 31 16:53:53 server1 slapd[1010]: conn=7448 op=1 SEARCH RESULT tag=101 err=0 text=
Dec 31 16:53:54 server1 slapd[1010]: conn=7448 op=2 UNBIND
Dec 31 16:53:54 server1 slapd[1010]: conn=-1 fd=43 closed

Annoyingly the ip-address where the session originates from is present only in one record of the session. This makes retrieving sessions by IP two-phase process: first, you need to retrieve session id and then the session requests. It would make queries much simpler if we could attach originating IP-address to each session event. We can do that by selecting connect records as a separate stream and join this with the rest of the records using session id.

The first thing to do is to develop the pattern. Having it done we can proceed right to writing a query script. You can find it at https://github.com/spectx/resources/blob/master/examples/queries/ldap_logs.sx.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@src = LIST(src:'s3s://sample-data/formats/log/ldap/$yyyy$-$MM$-$dd$_openldap.log')
| PARSE(pattern:FETCH('https://raw.githubusercontent.com/spectx/resources/master/examples/patterns/ldap.sxp'));

// create the stream with only connect records in it
@conn = @src | filter(type = 'connect') | select(connId, c_sock);

// normalize the op records by joining ip address from connect records:
@src
| filter(type = 'op')                        // we're only interested in operation records
| join(@conn on left.connId = right.connId)  // join the connection records
| select(timestamp,                          // select relevant fields
         right.c_sock as c_sock,
         connId,
         opId,
         op,
         details
);

Let’s save it as /user/ldap_view.sx. Now we have a view (script producing normalized fields), what we can use in further analysis of user activities. For example, let’s find out what are the top 10 of IP-addresses making search queries:

1
2
3
4
5
6
@[/user/ldap_view.sx]
| select(IPADDR(c_sock), cnt:count(op='SRCH'))
| group(@1)
| sort(cnt DESC)
| limit(10)
;
ipaddr cnt
192.168.2.104 3
192.168.4.36 1

Note

You may have noticed that the timestamps in records do not have year specified. It’s especially bad when you need to go through logs spanning over several years. SpectX has built-in support for adjusting timestamps based on file’s path_time or last_modified timestamps (read more about this here). In this example we used uri time patterns in the LIST command to initialize path_time.