import os
import sys
import json
import pandas as pd
from pandas.io.json import json_normalize
from celery_connectors.utils import ev
from spylunking.log.setup_logging import console_logger
from network_pipeline.utils import ppj
from network_pipeline.utils import rnow
from network_pipeline.build_packet_key import build_packet_key
from antinex_client.consts import SUCCESS
from antinex_client.consts import FAILED
from antinex_client.consts import ERROR
from antinex_client.consts import LOGIN_FAILED
from antinex_client.consts import ANTINEX_PUBLISH_ENABLED
from antinex_client.consts import ANTINEX_PUBLISH_REQUEST_FILE
from antinex_client.consts import ANTINEX_USE_MODEL_NAME
from antinex_client.consts import ANTINEX_URL
from antinex_client.consts import ANTINEX_USER
from antinex_client.consts import ANTINEX_MISSING_VALUE
from antinex_client.build_ai_client_from_env import build_ai_client_from_env
from antinex_client.generate_ai_request import generate_ai_request
log = console_logger(
name='csv')
[docs]class RecordPacketsToCSV:
"""RecordPacketsToCSV"""
def __init__(self):
"""__init__"""
self.recv_msgs = []
# save every nth number of messages
self.save_after_num = int(
ev("SAVE_AFTER_NUM",
"100"))
# shutdown after this number of messages
self.stop_after_num = int(
ev("STOP_AFTER_NUM",
"-1"))
if self.save_after_num < 0:
self.save_after_num = 1
if self.stop_after_num < 0:
self.stop_after_num = None
# shutdown if this file is found
self.stop_for_file = ev(
"STOP_FILE",
"/tmp/stop-recording-csv")
self.dataset_name = ev(
"DS_NAME",
"netdata")
self.save_dir = ev(
"DS_DIR",
"/tmp")
self.save_to_file = ev(
"OUTPUT_CSV",
"{}/{}-{}.csv".format(
self.save_dir,
self.dataset_name,
rnow("%Y-%m-%d-%H-%M-%S")))
self.archive_file = ev(
"ARCHIVE_JSON",
"{}/packets-{}-{}.json".format(
self.save_dir,
self.dataset_name,
rnow("%Y-%m-%d-%H-%M-%S")))
self.debug = bool(ev(
"DEBUG_PACKETS",
"0") == "1")
self.df = None
self.last_df = None
self.eth_keys = {"eth_id": "id"}
self.ip_keys = {"ip_id": "id"}
self.ipvsix_keys = {"ipvsix_id": "id"}
self.icmp_keys = {"icmp_id": "id"}
self.arp_keys = {"arp_id": "id"}
self.tcp_keys = {"tcp_id": "id"}
self.udp_keys = {"udp_id": "id"}
self.dns_keys = {"dns_id": "id"}
self.raw_keys = {"raw_id": "id"}
self.pad_keys = {"pad_id": "id"}
self.all_keys = {}
self.all_keys_list = []
self.all_eth = []
self.all_ip = []
self.all_ipvsix = []
self.all_icmp = []
self.all_arp = []
self.all_tcp = []
self.all_udp = []
self.all_dns = []
self.all_raw = []
self.all_pad = []
self.all_flat = []
self.all_rows = []
# noqa https://github.com/jay-johnson/antinex-client/blob/5fbcefaaed3d979b3c0829447b61592d5910ef22/antinex_client/build_ai_client_from_env.py#L19
self.client = build_ai_client_from_env()
# the client uses environment variables:
# noqa https://github.com/jay-johnson/antinex-client/blob/5fbcefaaed3d979b3c0829447b61592d5910ef22/antinex_client/consts.py#L23
# here is an example of what to export:
# noqa https://github.com/jay-johnson/antinex-client/blob/master/examples/example-prediction.env
self.request_dict = {}
if ANTINEX_PUBLISH_ENABLED:
if os.path.exists(ANTINEX_PUBLISH_REQUEST_FILE):
with open(ANTINEX_PUBLISH_REQUEST_FILE, "r") as f:
self.request_dict = json.loads(f.read())
# if publishing is enabled
# end of __init__
[docs] def process_ether_frame(self,
id=None,
msg=None):
"""process_ether_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: ether frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "eth_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.eth_keys:
self.eth_keys[new_key] = k
# end of capturing all unique keys
dt["eth_id"] = id
self.all_eth.append(dt)
log.debug("ETHER data updated:")
log.debug(self.eth_keys)
log.debug(self.all_eth)
log.debug("")
return flat_msg
# end of process_ether_frame
[docs] def process_ip_frame(self,
id=None,
msg=None):
"""process_ip_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: ip frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "ip_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.ip_keys:
self.ip_keys[new_key] = k
# end of capturing all unique keys
dt["ip_id"] = id
self.all_ip.append(dt)
log.debug("IP data updated:")
log.debug(self.ip_keys)
log.debug(self.all_ip)
log.debug("")
return flat_msg
# end of process_ip_frame
[docs] def process_ipvsix_frame(self,
id=None,
msg=None):
"""process_ipvsix_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: ipv6 frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "ipv6_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.ipvsix_keys:
self.ipvsix_keys[new_key] = k
# end of capturing all unique keys
dt["ipv6_id"] = id
self.all_ipvsix.append(dt)
log.debug("IPV6 data updated:")
log.debug(self.ipvsix_keys)
log.debug(self.all_ipvsix)
log.debug("")
return flat_msg
# end of process_ip_frame
[docs] def process_tcp_frame(self,
id=None,
msg=None):
"""process_tcp_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: tcp frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "tcp_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.tcp_keys:
self.tcp_keys[new_key] = k
# end of capturing all unique keys
dt["tcp_id"] = id
self.all_tcp.append(dt)
log.debug("TCP data updated:")
log.debug(self.tcp_keys)
log.debug(self.all_tcp)
log.debug("")
return flat_msg
# end of process_tcp_frame
[docs] def process_udp_frame(self,
id=None,
msg=None):
"""process_udp_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: udp frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "udp_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.udp_keys:
self.udp_keys[new_key] = k
# end of capturing all unique keys
dt["udp_id"] = id
self.all_udp.append(dt)
log.debug("UDP data updated:")
log.debug(self.udp_keys)
log.debug(self.all_udp)
log.debug("")
return flat_msg
# end of process_udp_frame
[docs] def process_dns_frame(self,
id=None,
msg=None):
"""process_dns_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: dns frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "dns_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.dns_keys:
self.dns_keys[new_key] = k
# end of capturing all unique keys
dt["dns_id"] = id
self.all_dns.append(dt)
log.debug("DNS data updated:")
log.debug(self.dns_keys)
log.debug(self.all_dns)
log.debug("")
return flat_msg
# end of process_dns_frame
[docs] def process_icmp_frame(self,
id=None,
msg=None):
"""process_icmp_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: icmp frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "icmp_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.icmp_keys:
self.icmp_keys[new_key] = k
# end of capturing all unique keys
dt["icmp_id"] = id
self.all_icmp.append(dt)
log.debug("ICMP data updated:")
log.debug(self.icmp_keys)
log.debug(self.all_icmp)
log.debug("")
return flat_msg
# end of process_icmp_frame
[docs] def process_arp_frame(self,
id=None,
msg=None):
"""process_arp_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: arp frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "arp_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.arp_keys:
self.arp_keys[new_key] = k
# end of capturing all unique keys
dt["arp_id"] = id
self.all_arp.append(dt)
log.debug("ARP data updated:")
log.debug(self.arp_keys)
log.debug(self.all_arp)
log.debug("")
return flat_msg
# end of process_arp_frame
[docs] def process_raw_frame(self,
id=None,
msg=None):
"""process_raw_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: raw frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "raw_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.raw_keys:
self.raw_keys[new_key] = k
# end of capturing all unique keys
dt["raw_id"] = id
self.all_raw.append(dt)
log.debug("RAW data updated:")
log.debug(self.raw_keys)
log.debug(self.all_raw)
log.debug("")
return flat_msg
# end of process_raw_frame
[docs] def process_pad_frame(self,
id=None,
msg=None):
"""process_pad_frame
Convert a complex nested json dictionary
to a flattened dictionary and capture
all unique keys for table construction
:param id: key for this msg
:param msg: pad frame for packet
"""
# normalize into a dataframe
df = json_normalize(msg)
# convert to a flattened dictionary
dt = json.loads(df.to_json())
flat_msg = {}
for k in dt:
new_key = "pad_{}".format(k)
flat_msg[new_key] = dt[k]["0"]
if new_key not in self.pad_keys:
self.pad_keys[new_key] = k
# end of capturing all unique keys
dt["pad_id"] = id
self.all_pad.append(dt)
log.debug("PAD data updated:")
log.debug(self.pad_keys)
log.debug(self.all_pad)
log.debug("")
return flat_msg
# end of process_pad_frame
[docs] def build_flat_msg(self,
id=None,
msg=None):
"""build_flat_msg
:param id: unique id for this message
:param msg: message dictionary to flatten
"""
flat_msg = {}
if not id:
log.error("Please pass in an id")
return None
if not msg:
log.error("Please pass in a msg")
return None
for k in msg["data"]:
if k == "ether":
flat_msg.update(self.process_ether_frame(
id=id,
msg=msg["data"][k]))
# end of ether
elif k == "ip":
flat_msg.update(self.process_ip_frame(
id=id,
msg=msg["data"][k]))
# end of ip
elif k == "ipv6":
flat_msg.update(self.process_ipvsix_frame(
id=id,
msg=msg["data"][k]))
# end of ipv6
elif k == "tcp":
flat_msg.update(self.process_tcp_frame(
id=id,
msg=msg["data"][k]))
# end of tcp
elif k == "udp":
flat_msg.update(self.process_udp_frame(
id=id,
msg=msg["data"][k]))
# end of udp
elif k == "dns":
flat_msg.update(self.process_dns_frame(
id=id,
msg=msg["data"][k]))
# end of dns
elif k == "icmp":
flat_msg.update(self.process_icmp_frame(
id=id,
msg=msg["data"][k]))
# end of icmp
elif k == "arp":
flat_msg.update(self.process_arp_frame(
id=id,
msg=msg["data"][k]))
# end of arp
elif k == "raw":
flat_msg.update(self.process_raw_frame(
id=id,
msg=msg["data"][k]))
# end of raw
elif k == "padding":
flat_msg.update(self.process_pad_frame(
id=id,
msg=msg["data"][k]))
# end of pad
else:
log.error(("Unsupported frame type={} "
"please file an issue to track this "
"with data={} msg={}")
.format(k,
ppj(msg["data"][k]),
msg["data"]))
# end of processing new message
return flat_msg
# end of build_flat_msg
[docs] def build_all_keys_dict(self):
"""build_all_keys_dict"""
log.info("finding keys")
for k in self.eth_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all eths
for k in self.ip_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all ips
for k in self.ipvsix_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all ipvsixs
for k in self.icmp_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all icmps
for k in self.arp_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all arps
for k in self.tcp_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all tcps
for k in self.udp_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all udps
for k in self.dns_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all dnss
for k in self.raw_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all raws
for k in self.pad_keys:
ak = "{}".format(k)
if ak not in self.all_keys:
self.all_keys[ak] = k
# end of building all pads
# this will be the columns for the csv
for k in self.all_keys:
self.all_keys_list.append(k)
log.debug(("unique all_keys keys={} values={}")
.format(len(self.all_keys_list),
self.all_keys))
# end of build_all_keys_dict
[docs] def flatten_all(self):
"""flatten_all"""
log.info("flattening - START")
self.all_rows = []
for idx, r in enumerate(self.all_flat):
new_row = {"idx": idx}
for k in self.all_keys_list:
if k in r:
new_row[k] = r[k]
else:
new_row[k] = None
# end of for all keys
self.all_rows.append(new_row)
# end of all_keys
log.info("flattening - END")
# end of flatten_all
[docs] def create_json_archive(self):
"""create_json_archive"""
archive_data = {"packets": self.recv_msgs,
"dataset": self.dataset_name,
"num_packets": len(self.recv_msgs),
"created": rnow()}
self.write_to_file(archive_data,
self.archive_file)
# end of create_json_archive
[docs] def convert_to_df(self):
"""convert_to_df"""
log.info(("converting={}")
.format(len(self.all_rows)))
if len(self.all_rows) == 0:
return
self.df = pd.DataFrame(self.all_rows).set_index("idx")
if len(self.df) != len(self.all_rows):
log.error(("Failed converting={} to rows={}")
.format(len(self.all_rows),
len(self.df)))
else:
log.info(("converted={} into rows={}")
.format(len(self.all_rows),
len(self.df)))
# end of convert_to_df
[docs] def write_to_file(self,
data_dict,
output_file_path):
"""write_to_file
:param data_dict:
:param output_file_path:
"""
log.info("saving={}".format(output_file_path))
with open(output_file_path, "w") as output_file:
output_file.write(str(ppj(data_dict)))
# end of write_to_file
[docs] def save_df_as_csv(self):
"""save_df_as_csv"""
if len(self.all_rows) == 0:
log.info(("no df={} to save")
.format(self.df))
return
else:
log.info(("saving "
"packets={} file={} rows={}")
.format(len(self.recv_msgs),
self.save_to_file,
len(self.df)))
self.df.to_csv(self.save_to_file,
sep=",",
encoding="utf-8",
index=True)
log.info(("done saving={}")
.format(self.save_to_file))
# end of saving if the dataframe is there
# end of save_df_as_csv
[docs] def save_data(self):
"""save_data"""
state = ""
try:
state = "create_json_archive"
log.info("creating json archive")
self.create_json_archive()
state = "building_unique_keys"
log.info("processing all unique keys")
self.build_all_keys_dict()
state = "flattening"
log.info("flattening all data")
self.flatten_all()
state = "converting"
log.info("converting to df")
self.convert_to_df()
state = "saving"
log.info("saving to df")
self.save_df_as_csv()
if ANTINEX_PUBLISH_ENABLED:
log.info(("publishing stream to rest={}")
.format(
ANTINEX_URL))
self.publish_predictions_to_core()
# end of if publishing to the core
except Exception as e:
log.error(("Failed state={} with ex={} to "
"save={}")
.format(state,
e,
self.save_to_file))
# end of save_data
[docs] def handle_msg(self,
body,
org_message):
"""handle_msg
:param body: dictionary contents from the message body
:param org_message: message object can ack, requeue or reject
"""
if os.path.exists(self.stop_for_file):
log.info(("Detected stop_file={} "
"shutting down")
.format(self.stop_for_file))
# drop the message back in the queue
# for next time
org_message.requeue()
sys.exit(1)
# end of stop file detection
try:
log.debug(("handle body={}")
.format(ppj(body)))
msg = body
id = build_packet_key()
recv_time = rnow()
# this could be made into celery tasks...
flat_msg = self.build_flat_msg(
id=id,
msg=msg)
if not flat_msg:
log.error(("Failed to build a flat message "
"for message={}")
.format(msg))
return
msg["id"] = id
msg["received"] = recv_time
if len(flat_msg) > 0:
if self.debug:
log.info(ppj(flat_msg))
flat_msg["id"] = id
flat_msg["received"] = recv_time
self.all_flat.append(flat_msg)
self.recv_msgs.append(msg)
# end of adding all flat messages
already_saved = False
num_recv = len(self.recv_msgs)
if (num_recv % self.save_after_num) == 0:
already_saved = False
self.save_data()
# end of saving a snapshot
if self.stop_after_num:
if num_recv >= self.stop_after_num:
if not already_saved:
self.save_data()
# avoid waiting on the save again
log.info("archive successful - purging buffer")
sys.exit(2)
# shutdown - good for testing
# if now set up for infinite consuming
except Exception as e:
log.error(("Failed processing msg={} "
"ex={}")
.format(body,
e))
# end of processing message
try:
org_message.ack()
except Exception as e:
log.error(("Failed ack-ing msg={} "
"ex={}")
.format(body,
e))
# end of acknowleding message was processed
log.info("done handle")
# end of handle_message
[docs] def publish_predictions_to_core(self):
"""publish_predictions_to_core"""
status = FAILED
msg = "not started"
try:
msg = "generating request"
log.info(msg)
# noqa https://stackoverflow.com/questions/29815129/pandas-dataframe-to-list-of-dictionaries
publish_req = generate_ai_request(
predict_rows=self.df.fillna(
ANTINEX_MISSING_VALUE).to_dict("records"),
req_dict=self.request_dict)
if publish_req["status"] != SUCCESS:
log.error(("failed generate_ai_request with err={}")
.format(
publish_req["error"]))
status = ERROR
else:
msg = "publishing as user={} url={} model={}".format(
ANTINEX_USER,
ANTINEX_URL,
ANTINEX_USE_MODEL_NAME)
log.info(msg)
response = self.client.run_job(
body=publish_req["data"])
if response["status"] == SUCCESS:
log.info("predictions sent")
status = SUCCESS
elif response["status"] == FAILED:
log.error(("job failed with error='{}' with response={}")
.format(
response["error"],
response["data"]))
status = ERROR
elif response["status"] == ERROR:
log.error(("job had an error='{}' with response={}")
.format(
response["error"],
response["data"]))
status = ERROR
elif response["status"] == LOGIN_FAILED:
log.error(("job reported user was not able to log in "
"with an error='{}' with response={}")
.format(
response["error"],
response["data"]))
status = ERROR
# logging for good/bad cases during publish
# if generated a good request
except Exception as e:
log.error(("failed generating request last_step='{}' ex={}")
.format(
msg,
e))
# end of try/ex
return status
# end of publish_predictions_to_core
# end of RecordPacketsToCSV