Quering Normalised LDAP Log Records¶
User activity related records in LDAP logs have seemingly logical structure, consisting of records representing events bound by conn= to a session:
Dec 31 16:53:53 server1 slapd: conn=7448 fd=43 connection from IP=192.168.4.36:40629 (IP=:: 389) accepted. Dec 31 16:53:53 server1 slapd: conn=7448 op=0 BIND dn="uid=user1,ou=people,dc=example,dc=com" method=128 Dec 31 16:53:53 server1 slapd: conn=7448 op=0 RESULT tag=97 err=0 text= Dec 31 16:53:53 server1 slapd: conn=7448 op=1 SRCH base="ou=people,dc=example,dc=com" scope=2 filter="(objectClass=*)" Dec 31 16:53:53 server1 slapd: conn=7448 op=1 SEARCH RESULT tag=101 err=0 text= Dec 31 16:53:54 server1 slapd: conn=7448 op=2 UNBIND Dec 31 16:53:54 server1 slapd: conn=-1 fd=43 closed
Annoyingly the ip-address where the session originates from is present only in one record of the session. This makes retrieving sessions by ip two phase process: first you need to retrieve session id and then the session requests. It would make queries much simpler, if we could attach originating ip-address to each session event. We can do that by selecting connect records as separate stream and join this with the rest of the records using session id.
What is even more annoying is that the timestamps in records may often turn out to be without year - this appears to be default OpenLDAP logging configuration. It’s especially bad when you need to go through logs spanning over several years. Luckily SpectX has built-in support for adjusting timestamps based on file’s path_time or last_modified timestamps (read more about this here).
First we declare PARSE statement to retrieve and parse data from log file. We’ll use already prepared pattern and example data from /user/examples/data/2015-12-31_openldap.log.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// Note that since we're using time patterns in uri to select the source data // the evaluated path_time will be used to automatically adjust the record timestamps. @src = PARSE(pattern:$[/user/examples/patterns/ldap.sxp], src:'sx:/user/examples/data/$yyyy$-$MM$-$dd$_openldap.log'); // create the stream with connect records in it @conn = @src .filter(type = 'connect') .select(connId, c_sock); // normalized view of ldap op records: @op = @src .filter(type = 'op') //we're only interested in operation records .join(@conn on left.connId = right.connId) //join the connection records .select(timestamp, //note correct dateTime year values right.c_sock as c_sock, connId, opId, op, details );
Now we have a script with normalized fields, what we can use as a view in further analysis on user activities.
|2015-12-31 18:53:53||192.168.4.36:40629||7448||0||BIND||dn=’‘uid=user1,ou=people,dc=example,dc=com’’ method=128|
|2015-12-31 18:53:53||192.168.4.36:40629||7448||0||RESULT||tag=97 err=0 text=|
|2015-12-31 18:53:53||192.168.4.36:40629||7448||1||SRCH||base=’‘ou=people,dc=example,dc=com’’ scope=2 filter=’‘(objectClass=*)’‘|
|2015-12-31 18:53:53||192.168.4.36:40629||7448||1||SEARCH||RESULT tag=101 err=0 text=|
|2016-01-01 07:00:14||192.168.2.104:40648||7453||0||BIND||dn=’‘CN=MANAGER,DC=EXAMPLE,DC=COM’’ method=128|
|2016-01-01 07:00:14||192.168.2.104:40648||7453||0||RESULT||tag=97 err=0 text=|
|. . .|
For example we would want to know the IP-addresses making the most search queries:
1 2 3 4 5 6
// top 10 of ip-addresses making search queries: @op .select(IPADDR(c_sock), cnt:count(op='SRCH')) .group(@1) .sort(cnt DESC) .limit(10)