I'm currently working on a script that is to be run for a particular amount of time(let's say a minute), and then has to stop its execution for like 5 seconds and then start executing again(this cycle keeps on repeating), without fail. The script is in Python 3.x and runs on Ubuntu environment. So creating a Service/Daemon of the same would also work(although the execution of the script has to stop for a few seconds).
It's basically using Scapy Module for packet sniffing while capturing Live and then doing some analysis on those captured packets, before inserting the data in the database. When i stop the execution of the script by pressing Ctrl+C
it stops, and then inserts data in the DB and not in parallel. Although it'd be better if this process gets in parallel and the script never has to stop executing, but till then I need a workaround for the same.
My Approach:
import scapy
def main():
capture = LiveCapture(interface = "<some interface>", filter="<some filter>")
count = 0
for pkt in capture:
#DO SOMETHING
insert_in_DB() #--------This happens only when I stop the execution.
if count == 100:
count = 0
#back to main()
So, you get the general idea of what my code is trying to do, right? But I want this to happen after every 1 minute, where after running for 1 minute, the code execution stops so that the data can be entered into the DB and then start again after 5 or less seconds.
Thanks in advance :)
You have to use MySQLdb to use MySQL in python and Twisted adbapi to perform async connections.
MySQLdb:
sudo apt-get install python-dev
sudo apt-get install libmysqlclient-dev
pip3 install mysql-python
Twisted adbapi:
pip3 install twisted
Spider.py
def parse(self, response):
yield {
'item_id' : ...
'item_name': ...
...
}
Add a MySQLStorePipeline
to pipelines.py
and declare it in settings.py
:
pipelines.py
from twisted.enterprise import adbapi #pip3 install twisted
from scrapy import log
from scrapy.conf import settings
import MySQLdb.cursors
class MySQLStorePipeline(object):
#A pipeline to store the item in a MySQL database.
#This implementation uses Twisted's asynchronous database API.
def __init__(self):
dbargs = settings.get('DB_CONN')
self.dbpool = adbapi.ConnectionPool(
"MySQLdb",
cursorclass=MySQLdb.cursors.DictCursor,
**dbargs
)
def process_item(self, item, spider):
# run db query in thread pool
query = self.dbpool.runInteraction(self._conditional_insert, item)
query.addErrback(self.handle_error)
return item
def _conditional_insert(self, tx, item):
# create record if doesn't exist.
# all this block run on it's own thread
item_id = item['item_id']
db_table = 'your_table_name'
try:
tx.execute("SELECT 1 FROM " + db_table + " WHERE item_id = %s", (item_id, ))
except:
print("## Query Failed:" + str(tx._last_executed))
result = tx.fetchone()
if result:
log.msg("Item already stored in db: %s" % item, level=log.DEBUG)
else:
try:
tx.execute(\
"INSERT INTO " + db_table + " (item_id, item_name) "
"values (%s, %s)",
(item_id, item['item_name'])
)
log.msg("Item stored in db: %s" % item, level=log.DEBUG)
except:
print("## Query Failed:" + str(tx._last_executed))
def handle_error(self, e):
log.err(e)
Settings.py
ITEM_PIPELINES = {
'your_project.pipelines.your_projectPipeline': 300,
'your_project.pipelines.MySQLStorePipeline': 600,
} #note: https://stackoverflow.com/questions/37442907/scrapy-attributeerror-list-object-has-no-attribute-iteritems
DB_CONN = {
'db': 'your_db',
'user': 'your_username',
'passwd': 'your_password',
'host': 'your_host',
'charset': 'utf8',
'use_unicode': True,
}
notes:
Replace all your_***** with your SQL credentials.
The code above assumes that your SQL table has only 2 columns: 'item_id', 'item_name', of course, you can modify it in the INSERT INTO query.
If you have any problem leave a comment.