BigSnarf blog

Infosec FTW

Algebird for Infosec Analytics

Using Spark to do real-time large scale log analysis —> unified logging

Spark on IPython Notebook used for analytic workflows of the auth.log.

What is really cool about this Spark platform is that I can either batch or data mine the whole dataset on the cluster. Built on this idea. In the screenshot below, you can see me using my IPython Notebook for interactive query. All the code I create to investigate the auth.log can easily be converted to Spark Streaming DStream objects in Scala. Effectively, I can build a real-time application all from the same platform. “Cool” IMHO. In other posts you I will document pySpark batch, and DStream algorithms in Scala processing of auth.log Screen Shot 2014-03-26 at 8.56.29 PM These are some of the items I am filtering in my PySpark interactive queries in the Notebook

Successful user login “Accepted password”, “Accepted publickey”, “session opened”
Failed user login “authentication failure”, “failed password”
User log-off “session closed”
User account change or deletion “password changed”, “new user”, “delete user”
Sudo actions “sudo: … COMMAND=…” “FAILED su”
Service failure “failed” or “failure”

Note that ip address is making a ton of requests invalid   Maybe I should correlate to web server request logs too?

Excessive access attempts to non-existent files
Code (SQL, HTML) seen as part of the URL
Access to extensions you have not implemented
Web service stopped/started/failed messages
Access to “risky” pages that accept user input
Look at logs on all servers in the load balancer pool
Error code 200 on files that are not yours
Failed user authentication Error code 401, 403
Invalid request Error code 400
Internal server error Error code 500

Here is the data correlated to Successful Logins, Failed Logins and Failed logins to an invalid user. Notice the “″ ip address. suspicious


Screen Shot 2014-03-28 at 12.52.46 PM Links


Python Spark processing auth.log file

Screen Shot 2014-04-22 at 8.55.21 PM

auth log analysis with spark
import sys
from operator import add
from time import strftime
from pyspark import SparkContext

outFile = "counts" + strftime("%Y-%m-%d")
logFile = "auth.log"
destination = "local"
appName = "Simple App"

sc = SparkContext(destination, appName)
logData = sc.textFile(logFile).cache()
failedData = logData.filter(lambda x: 'Failed' in x)
rootData = failedData.filter(lambda s: 'root' in s)

def splitville(line):
 return line.split("from ")[1].split()[0]

ipAddresses =
counts = x: (x, 1)).reduceByKey(add)
output = counts.collect()
for (word, count) in output:
 print "%s: %i" % (word, count)

Scala and Algebird example in REPL

Screen Shot 2014-04-22 at 9.32.08 AM

//scala wordcount example
val lines = Source.fromFile("").getLines.toArray
val emptyCounts = Map[String,Int]().withDefaultValue(0)
val counts = words.foldLeft(emptyCounts)({(currentCounts: Map[String,Int], word: String) => currentCounts.updated(word, currentCounts(word) + 1)})

//algebird hyperloglog
import HyperLogLog._
val hll = new HyperLogLogMonoid(4)
val data = List(1, 1, 2, 2, 3, 3, 4, 4, 5, 5)
val seqHll = { hll(_) }
val sumHll = hll.sum(seqHll)
val approxSizeOf = hll.sizeOf(sumHll)
val actualSize = data.toSet.size
val estimate = approxSizeOf.estimate

//algebird bloomfilter
import com.twitter.algebird._
val NUM_HASHES = 6
val WIDTH = 32
val SEED = 1
val bfMonoid = new BloomFilterMonoid(NUM_HASHES, WIDTH, SEED)
val bf = bfMonoid.create("1", "2", "3", "4", "100")
val approxBool = bf.contains("1")
val res = approxBool.isTrue

//algebird countMinSketch
import com.twitter.algebird._
val DELTA = 1E-10
val EPS = 0.001
val SEED = 1
val CMS_MONOID = new CountMinSketchMonoid(EPS, DELTA, SEED)
val data = List(1L, 1L, 3L, 4L, 5L)
val cms = CMS_MONOID.create(data)
val data = List("1", "2", "3", "4", "5")
val cms = CMS_MONOID.create(data)

//sketch map
import com.twitter.algebird._
val DELTA = 1E-8
val EPS = 0.001
val SEED = 1

implicit def string2Bytes(i : String) =

val MONOID = SketchMap.monoid[String, Long](PARAMS)
val data = List( ("1", 1L), ("3", 2L), ("4", 1L), ("5", 1L) )
val sm = MONOID.create(data) 
MONOID.frequency(sm, "1")
MONOID.frequency(sm, "2")
MONOID.frequency(sm, "3")

Spark Streaming Word Count from Network Socket in Scala

Screen Shot 2014-04-21 at 9.14.30 PM
package com.wordpress.bigsnarf.spark.streaming.examples

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._

 * Counts words in text encoded with UTF8 received from the network every second.

object NetworkWordCount {
 def main(args: Array[String]) {

 if (args.length < 3) {
   System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
   "In local mode, <master> should be 'local[n]' with n > 1")


 // Spark Context
 val ssc = new StreamingContext(args(0), 

 // Socket takes Text Stream
 val lines = ssc.socketTextStream(args(1), 
 // split words
 val words = lines.flatMap(_.split(" "))

 // count the words tuple -> reduce
 val wordCounts = => (x, 1)).reduceByKey(_ + _)

 // print to screen

Setting up Spark on your Macbook videos

PySpark analysis Apache Access Log

Screen Shot 2014-03-19 at 3.04.44 PM
# read in hdfs file to Spark Context object and cache
logs = sc.textFile('hdfs:///big-access-log').cache()

# create filters
errors500 = logs.filter(lambda logline: "500" in logline)
errors404 = logs.filter(lambda logline: "404" in logline)
errors200 = logs.filter(lambda logline: "200" in logline)
# grab counts
e500_count = errors500.count()
e404_count = errors404.count()
e200_count = errors200.count()
# bring the results back to this box locally
local_500 = errors500.take(e500_count)
local_404 = errors404.take(e404_count)
local_200 = errors200.take(e200_count)

def make_ip_list(iterable):
    m = []
    for line in iterable:
    return m
def list_count(iterable):
    d = {}
    for i in iterable:
        if i in d:
            d[i] = d[i] + 1
            d[i] = 1
    return d
# results of people making 500, 404, and 200 requests for the dataset
ip_addresses_making_500_requests = list_count(make_ip_list(local_500))
ip_addresses_making_404_requests = list_count(make_ip_list(local_404))
ip_addresses_making_200_requests = list_count(make_ip_list(local_200))

Security Big Data Analytics Solutions

Building a system that can do full context PCAP for a single machine is trivial, IMHO compared to creating predictive algorithms for analyzing PCAP traffic.  There are log data search solutions like Elasticsearch, GreyLog2, ELSA, Splunk and Logstash that can help you archive and dig through the data.

My favorite network traffic big data solution (2012) is PacketPig. In 2014 I noticed another player named packetsled. I found this nice setup by AlienvaultSecurity OnionBRO IDS is a great network security IDS etc distro. I have seen one called xtractr, MR for forensics. Several solutions exist and PCAP files can be fed to the engines for analysis. I think ARGUS  and Moloch (PCAP Elasticsearch) have a place here too, but I haven’t tackled it yet. There’s a DNS Hadoop presentation from Endgame clairvoyant-squirrel. There’s also openfpc and streamDB. There are some DNS tools like passivedns. ELSA is another tool.

I started using PCAP to CSV conversion perl program, and written my own sniffer to csv in scapy. Super Timelines are being done in python too. Once I get a PCAP file converted to csv, I load it up to HDFS via HUE. I also found this PCAP visualization blog entry by Raffael Marty.

I’ve stored a bunch of csv network traces and did analysis using HIVE and PIG queries. It was very simple. Name the columns and query each column looking for specific entries. Very labour intensive. Binary analysis on Hadoop.

I’m working on a MapReduce library that uses machine learning to classify attackers and their network patterns. As of 2013, there are a few commercial venders like IBM and RSA which have added Hadoop capability to their SIEM product lines. Here is Twitters logging setup. In 2014 I loaded all the csv attack data into CDH4 cluster with Impala query engine. I’m also looking at writing pandas dataframes to Googles Big Query. As of 2014 there are solutions on hadoop for malware analysis , forensics , DNS data mining. Cisco has released their OpenSOC here

The biggest advantage with all these systems will be DATA ENRICHMENT. Feeding and combining data to turn a weak signal into actionable insights.

There are a few examples of PCAP ingestion with open source tools like Hadoop:

First one I found was P3:

The second presentation I found was Wayne Wheelers – SherpaSurfing and

The third I found was

The fourth project I found was presented at BlackHatEU 2012 by PacketLoop and

Screen Shot 2012-11-30 at 11.15.22 AM

AOL Moloch is PCAP Elasticsearch full packet search


Moloch is an open source, large scale IPv4 packet capturing (PCAP), indexing and database system. A simple web interface is provided for PCAP browsing, searching, and exporting. APIs are exposed that allow PCAP data and JSON-formatted session data to be downloaded directly. Simple security is implemented by using HTTPS and HTTP digest password support or by using apache in front. Moloch is not meant to replace IDS engines but instead work along side them to store and index all the network traffic in standard PCAP format, providing fast access. Moloch is built to be deployed across many systems and can scale to handle multiple gigabits/sec of traffic.

Installation is pretty simple for a POC

  1. Spin up an Ubuntu box
  2. Update all the packages
  3. git clone
  4. follow tutorial if you must
  5. cd moloch
  6. ./
  7. follow prompts
  8. load sample PCAPs from
  9. Have fun with Moloch

NSRL and Mandiant MD5 in python Bloom Filters


Get every new post delivered to your Inbox.

Join 40 other followers