I've got the following code:
for f in fileListProtocol.files:
if f['filetype'] == '-':
filename = os.path.join(directory['filename'], f['filename'])
print 'Downloading %s...' % (filename)
newFile = open(filename, 'w+')
d = ftpClient.retrieveFile(filename, FileConsumer(newFile))
d.addCallback(closeFile, newFile)
Unfortunately, after downloading several hundred of the 1000+ files in the directory in question I get an IOError about too many open files. Why is this when I should be closing each file after they've been downloaded? If there's a more idiomatic way to approach the whole task of downloading lots of files too, I'd love to hear it. Thanks.
Update: Jean-Paul's DeferredSemaphore
example plus Matt's FTPFile
did the trick. For some reason using a Cooperator
instead of DeferredSemaphore
would download a few files and then fail because the FTP connection would have died.
Assuming that you're using FTPClient
from twisted.protocols.ftp
... and I certainly hesitate before contradicting JP..
It seems that the FileConsumer
class you're passing to retrieveFile
will be adapted to IProtocol
by twisted.internet.protocol.ConsumerToProtocolAdapter
, which doesn't call unregisterProducer
, so FileConsumer
doesn't close the file object.
I've knocked up a quick protocol that you can use to receive the files. I think it should only open the file when appropriate. Totally untested, you'd use it in place of FileConsumer
in your code above and won't need the addCallback
.
from twisted.python import log
from twisted.internet import interfaces
from zope.interface import implements
class FTPFile(object):
"""
A consumer for FTP input that writes data to a file.
@ivar filename: a filename to be opened for writing.
"""
implements(interfaces.IProtocol)
def __init__(self, filename):
self.fObj = None
self.filename = filename
def makeConnection(self,transport)
self.fObj = open(self.filename,'wb')
log.info('Opened %s for writing' % self.filename)
def connectionLost(self,reason):
self.fObj.close()
log.info('Closed %s' % self.filename)
def dataReceived(self, bytes):
self.fObj.write(bytes)