Piped Commands

The piped commands syntax allows compiling queries by chaining commands one after other and directing previous output to the next input, just like the UNIX command piping. The choice and ordering of commands have almost no restriction except one: the command chain must begin with a record stream source. Commands are always processed from right to left.

Commands are separated by pipe ( | ) or dot ( . ). They signify redirecting the command output from screen to the next command. The absence of a redirector sends the output of the last command to SpectX result screen.

Aliasing

Default syntax for declaring alias both for record stream and expressions is:

{record_stream | expression} AS alias

However, expressions may be aliased also like this:

alias ':' expression

The latter syntax is used for grammar definitions and examples of piped commands query reference.

Specifying Source

SpectX queries can select from record streams:

{ record_stream_create_cmd | '@'stream_ref | '@' '[' results_file ']' | '@' '[' script_file ']' } [AS alias]

where:

  • record_stream_create_cmd is one of the record stream create commands: LIST, PARSE, SOURCE, DUAL, GREP, VALUE
  • stream_ref is variable referring to record stream source
  • results_file is the filename of stored record stream result, enclosed in square brackets. The filename may include absolute or relative path.
  • script_file is the filename of query script, enclosed in square brackets. The filename may include absolute or relative path.
  • alias - you can assign the source stream an alias, which can be used a a reference within the query statement. AS keyword is mandatory.

Note that references used in SpectX queries must have type assigned, therefore stream_ref, results_file and script_file must be prepended with @.

Example 1. DUAL command as data source:

1
2
3
4
/* Since we are not redirecting output to next command
the result is sent to screen */

dual(2);    // generate 2 records using dual

Example 2. record stream reference as data source:

1
2
3
4
5
6
@intStr = dual(10).select(i);

@intStr
 .filter(i > 5)     // commands can be separated by dot
 | limit(2)         // or pipe
;

Example 3. Using another script (a view) as data source:

1
2
@[/user/examples/views/my_webserver_access_logs.sx]
 .limit(100);

Selecting fields

select( [ alias':' ]
        { *
          | alias.*
          | '@'position
          | function
          | expression
          | literal
        }
        [ , ... ]
)

select command performs same operation as the SQL-style SELECT clause. List of comma separated expressions can contain literals, function calls, field names and combinations of them. The list can contain wildcard symbol to select multiple fields.

Example 4.

1
2
3
4
5
6
7
8
9
dual(100)               // generate 100 records using dual
 .select(
  i                     // select by field name
  ,myLong:@2            // select long value by position, assign alias
  ,mySum:(i + myLong)   // sum int and long, assign alias
 )
 .filter(myLong > 10)   // commands can be separated by dot
 | sort(mySum)          // or pipe
 .limit(10);

UnionAll

Similarly to SQL style you can combine the rows of two datasets into one resultset with the unionAll command:

unionAll( src [ ,... ] )

    src:
    { record_stream_create_cmd | '@'stream_ref | '@' '[' results_file ']' | '@' '[' script_file ']' } [AS alias]

Example 5.

1
2
3
4
5
@s1 = VALUES(i:1, d:2.2, t:NOW()); //create a stream using VALUES cmd

@s1
 .unionall(dual(20,10))            //append to @s1 10 rows generated from dual
 .select(i, s, ip);

Join

The join command performs inner join operation (same as the SQL-style JOIN clause):

join( {'@'stream_ref | '@' '[' results_file ']' | '@' '[' script_file ']'} [AS alias] ON join_condition )

join_condition:
field_from_left_side_data = field_from_right_side_data [ {AND | OR} boolean_expr ,... ]

Note that assigning alias is optional:

  • when it is omitted then default aliases are assigned to left and right streams: left and right respectively.
  • fields with matching names will be renamed by prepending left_ or right_ to field name respectively to originating stream
  • fields with different names remain the same in resultset

The join_condition allows to specify additional conditions based on boolean expressions.

You can chain as many different join commands as you like.

Example 6:

1
2
3
4
5
6
7
8
@ipInfo = dual(0,100).select(ip, cc(ip + 19.54.33.154));

dual(100).select(i, t as time, ip)               //neither first or second streams in join have aliases defined
 .join(@ipInfo on left.ip = right.ip             //hence they get assigned default aliases
                  AND left.time < now()[+20 ms]) //you can use additional join conditions too
                                                 //note third joined stream having alias defined
 .join(@[/user/examples/doc/query_lang/query_reference/pipe_style/example4.sx] as sumInfo on left.i = sumInfo.i)
;

Grouping and Aggregation

group( { field_name | '@'position | alias | expression [ gsort_opt ] } [, ... ] )

gsort_opt:
        {   SORT { field_name | '@'position | alias | expression } [ASC | DESC]
          | NOSORT
        }

The group command performs same operation as the SQL-style GROUP BY clause i.e. it defines the scope of the aggregation computation. The grouping options are also the same:

  1. group by all non-aggregated fields (in select clause) will yield computing aggregations for all unique values of combinations of grouped fields
  2. group by some of the non-aggregated fields (in select clause) will yield computing aggregations for all unique values of combinations of grouped fields. The values of non-aggregated fields in resultset are undetermined (i.e the ordering of these fields are not determined).
  3. when GROUP BY is omitted the resultset will contain one computed aggregation for the whole selected dataset (which may be restricted by earlier filter command). The values of non-aggregated fields in resultset are undetermined.

NB! In order to take the effect of grouping you must place group command immediately after select command with the aggregation function.

Example 7. Getting top 5 of ip-addresses generating access denied requests:

1
2
3
4
5
6
@[/user/examples/views/my_webserver_access_logs.sx]
 .filter(response=404)
 .select(clientIp, cnt:count(*))
 .group(clientIp)
 .sort(cnt DESC)
 .limit(5);

Example 8. Computing top 5 of ip-address user-agent pairs:

1
2
3
4
5
6
@[/user/examples/views/my_webserver_access_logs.sx]
 .filter(response != 404)
 .select(clientIp, agent, cnt:count(*))
 .group(@1, @2)                             // grouping fields referred by positions
 .sort(cnt DESC)
 .limit(5);

Example 9. Omitting group will yield to count(*) being performed over all selected dataset:

1
2
3
4
@[/user/examples/views/my_webserver_access_logs.sx]
 .filter(response != 404)
 .select(clientIp, agent, cnt:count(*))
;

Grouping has its own private sorting option, allowing to control the behaviour of sorting which forms the basis of grouping. By default the dataset is sorted by group key(s) in its natural or increasing order. Then the aggregation function is applied. When group key changes, then the aggregate value for that key is emitted and aggregate function is reset. The process continues until the end of sorted dataset.

With group sort the sort key is applied independently of group key. While group key remains the same the dataset gets sorted by sort key instead. The aggregates are still computed looking at group key in sorted dataset: when it changes then computed aggregate value is emitted and aggrate function is reset. This allows you to compute aggregates on completely different set of groups which sometimes becomes very useful. Take for instance computing aggregates on noncontiguous ranges.

Example 10. Computing ipv4 address ranges of US requires applying MIN(), MAX() aggregate functions over ip addresses groups of ip country code. With default sorting we would get just the beginning of lowest and the end of highest of ranges in US:

1
2
3
4
5
6
7
8
9
dual(16909050, 86231040)//evaluate over a space of first 10 US ip ranges
  .select(
     ipCC:cc(ip)         //compute ipv4 country code
     ,min(ip)            //begin of range
     ,max(ip)            //end of range
  )
  .group(ipCC)           //compute min max for each ip country
  .filter(ipCC='US')     //filter out US
;
ipCC min max
US 1.2.3.0 220.232.59.132

However, when we change group sort to NOSORT (i.e leaving dataset unsorted) we will get all 20051 ipv4 ranges of US, since the addresses in dataset are in increasing order. Assuming that ranges are contiguous the group key changes appear at the borders of ranges and we’ll get min, max computed exactly there:

Example 11.

1
2
3
4
5
6
7
8
9
dual(16909050, 86231040)//evaluate over a space of first 10 US ip ranges
 .select(
   ipCC:cc(ip)          //compute ipv4 country code
   ,min(ip)             //begin of range
   ,max(ip)             //end of range
 )
 .group(ipCC NOSORT)    //compute min max on unsorted dataset
 .filter(ipCC='US')     //filter out US
;
ipCC min max
US 1.2.3.0 1.2.3.255
US 3.0.0.0 4.16.47.255
US 4.16.56.0 4.16.145.255
...    

Filtering fields

filter(boolean_expr [ ,... ])

boolean_expr:
  {  expression_returning_boolean_value
   | boolean_expr AND [NOT] boolean_expr
   | boolean_expr OR [NOT] boolean_expr
   | NOT boolean_expr
  }

filter command performs same operation as the SQL-style WHERE clause. Boolean returning expressions are given as argument to command. Multiple expressions can be combined using boolean AND and OR operators optionally enclosed in parenthesis.

Example 12.

1
2
3
4
dual(16909060,10000)
 .select(ip, s)
 .filter(cc(ip)='US' OR cc(ip)='CA')
;

Sorting

sort( {field_name | '@'position | alias | expression } [ASC | DESC] [ ,... ] )

sort command performs same operation as the SQL-style ORDER BY clause. Field names, positions or aliases to be sorted by, are given as arguments to command. To sort by multiple keys separate them by commas and the list is processed from left to right. Use ASC (ascending) or DESC (descending) to specify the sort direction. ASC is the default. Each sort key can have its own direction assigned.

See examples above.

Limiting resultset

limit( [offset,] { rowcount | expression } )

limit command performs same operation as the SQL-style LIMIT clause: it limits the rows in the dataset passed to it by number of rowcount rows. You can also specify offset which causes specified number of rows from beginning being omitted.

See examples above.

Saving resultset

Resultset from a query can be persisted in following ways:

1. Stored in SpectX as table

save( table_name::STRING [, overwrite::BOOLEAN ] )

    table_name - name of stored resultset (enclosed in single or double quotes). Must have *.sxt extension appended.
    overwrite - enables/disables overwriting an existing resultset with the same table_name. Optional. Default value
                is false.

At successful execution save command outputs:

  • store_path - name and path of saved file
  • rows - total count of records saved
  • bytes - size of saved resultset
  • field_count - count of fields in the resultset

Saved results can be used as a source in any query scripts.

Example 13.

1
2
3
dual(10)
 .select(i)
 .save('/user/my_ten_cents.sxt', true);

2. Stored in a relational database table

jdbc_insert( uri::STRING, table::STRING [,primaryKey::ARRAY] [,username::STRING] [,password::STRING] )
jdbc_update( uri::STRING, table::STRING  ,whereFields::ARRAY [,username::STRING] [,password::STRING] )
jdbc_delete( uri::STRING, table::STRING  ,whereFields::ARRAY [,username::STRING] [,password::STRING] )
jdbc_query ( uri::STRING ,query::STRING [,username::STRING] [,password::STRING] )

where:
    uri - Jdbc connection string. The format of string depends on the particular Jdbc driver of the database.
    table - name of destination table.
    username - username of database user. Optional.
    password - password of database user. Optional.
    primaryKey - field names of primary key (expressed as array of strings). Optional.
    whereFields - names of fields used for constructing WHERE clause. Expressed as an array of strings.
    query - create, insert, update or delete SQL query.

jdbc_insert constructs and executes an SQL INSERT statement for each record in the input record stream. Creates table if it does not exist.

jdbc_update constructs and executes an SQL UPDATE statement for eah record in the input record stream. The field names specified in whereFields array and their respective values in the record will be used to construct WHERE clause.

jdbc_delete constructs and executes an SQL DELETE statement for eah record in the input record stream. The field names specified in whereFields array and their respective values in the record will be used to construct WHERE clause.

jdbc_query executes create, insert or update statement specified in query.

Note that you need to enable respective Jdbc drivers in configuration.

At successful execution all jdbc_ pipe commands output following:

  • uri - Jdbc connection uri to the database
  • table - affected table
  • rows - number of rows affected by operation
  • field_count - input field count

Example 14. Store 10 generated records in an sqlite database table test using jdbc_insert command:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
/* NB! In order for this example to work:
 1) sqlite Jdbc driver must be enabled in SpectX configuration and
 2) an sqlite database must exist in the path specified by jdbc connection string
*/

dual(10)
 .select(t as time, ip, s as description)
 .jdbc_insert(uri:'jdbc:sqlite:file:/tmp/test.db'   //test.db must exists in /tmp
              ,table:'test'                         //insert creates table 'test' if it does not exist with
              ,primaryKey:['time']                  //primary key using field 'time'
);

3. Stored in Elasticsearch index

save_es( uri::STRING, index::STRING
         [ ,type::STRING ]
         [ ,_insecure_tls::BOOLEAN ]
         [ ,_rows_per_batch::INTEGER ]
         [ ,credentials:{...} ]
)

where:

  • uri - ElasticSearch endpoint root uri.

  • index - index(es) to read from. Comma separated (no whitespaces), wildcards allowed.

  • type - Elasticsearch index mapping type. Required for Elasticsearch versions prior to 6.0.0

  • _insecure_tls - whether to skip server certificate chain & host validation, default false. Optional.

  • _rows_per_batch - how many rows to send per batched request to ES, default 1000. Optional.

  • credentials - authentication attributes for different schemes:

    • credentials:{type:'basic', user::STRING, password::STRING}
    • credentials:{type:'xpack', user::STRING, password::STRING}
    • credentials:{type:'token', token::STRING} - OAuth2 Bearer token obtained via Get token API
    • credentials:{type:'aws', accessKeyId::STRING, secretKey::STRING, region::STRING} - AWS IAM user credentials for using Elasticsearch Service
    • credentials:{type:'ec2'} - use when accessing AWS Elasticsearch Service from AWS EC2 role (credentials are retrieved from instance metadata)

When Elasticsearch is configured to accept anonymous commands then credentials can be omitted.

Example 15. Insert 100 records of generated data to Elasticsearch index example:

1
2
3
4
5
6
7
/* NB! In order for this example to work an Elasticsearch 6.0 (or later) must run at localhost
       accepting anonymous commands */

dual(100)
 .select(time:t, desc:s, ipaddr:ip)
 .save_es(uri:"http://127.0.0.1:9200", index:"example")
;

Grammar

Note that commands following to data source can be in any order (except group() which if present, has to follow select statement).

record_stream [ [AS] alias ]
{ '.' | '|' }
select( [ alias':' ] { * | alias.* | '@'position | function | expression | literal } [ , ... ] )
{ '.' | '|' }
[ unionAll( record_stream, [ ,... ] )
{ '.' | '|' }
[ join( {'@'stream_ref | '@' '[' results_file ']' | '@' '[' script_file ']'} [AS alias] ON join_condition ) ]
{ '.' | '|' }
[ group( group_args ) ]
{ '.' | '|' }
[ filter( boolean_expr [ ,... ] ) ]
{ '.' | '|' }
[ sort( {field_name | '@'position | alias | expression } [ASC | DESC] [ ,... ] ) ]
{ '.' | '|' }
[ limit( [offset,] { rowcount | expression } ) ]
{ '.' | '|' }
[ save( table_name [, overwrite ] ) ]
{ '.' | '|' }
[ jdbc_insert( uri, table [,primaryKey] [,username] [,password] ) ]
{ '.' | '|' }
[ jdbc_update( uri, table ,whereFields [,username] [,password] ) ]
{ '.' | '|' }
[ jdbc_delete( uri, table, whereFields [,username] [,password] ) ]
{ '.' | '|' }
[ jdbc_query( uri, query [,username] [,password] ) ]
{ '.' | '|' }
[ save_es(uri, index [,type] [,_insecure_tls] [,_rows_per_batch] [,credentials] ) ]

record_stream:
    { record_stream_create_cmd | '@'stream_ref | '@' '[' results_file ']' | '@' '[' script_file ']' }

join_condition:
field_from_left_side_data = field_from_right_side_data [ {AND | OR} boolean_expr ,... ]

group_args:
    { field_name | '@'position | alias | expression [ gsort_opt ] } [, ... ]

    gsort_opt:
            {   SORT { field_name | '@'position | alias | expression } [ASC | DESC]
              | NOSORT
            }

boolean_expr:
  {  expression_returning_boolean_value
   | boolean_expr AND [NOT] boolean_expr
   | boolean_expr OR [NOT] boolean_expr
   | NOT boolean_expr
  }

overwrite:
    true | false