from network_pipeline.consts import SOURCE
from network_pipeline.consts import FORWARD_EXCHANGE
from network_pipeline.consts import FORWARD_ROUTING_KEY
from network_pipeline.consts import FORWARD_QUEUE
from spylunking.log.setup_logging import console_logger
from network_pipeline.utils import rnow
from network_pipeline.convert_pkt_to_json import convert_pkt_to_json
from network_pipeline.publisher import pub
import kamene.all as kamene
log = console_logger(
name='proc')
[docs]def handle_packets(pk):
"""handle_packets
:param pk: data packet that kamene sends in
"""
log.info(("processing with pub={}")
.format(pub))
# get the lowest layer
eth = pk.getlayer(kamene.Ether)
should_forward = False
send_msg = {"data": {},
"created": rnow(),
"source": SOURCE}
if eth:
# parse all layer frames under ethernet
send_msg["data"] = convert_pkt_to_json(eth)
should_forward = True
else:
log.error(("unsupported pk={}")
.format(pk))
# end of if supported
if should_forward:
log.info("forwarding")
# Publish the message:
msg_sent = pub.publish(body=send_msg,
exchange=FORWARD_EXCHANGE,
routing_key=FORWARD_ROUTING_KEY,
queue=FORWARD_QUEUE,
serializer="json",
retry=True)
log.info("done forwarding={}".format(msg_sent))
# end of should_forward
# end of handle_packets