Ticket #1037: sftpd.py

File sftpd.py, 50.9 KB (added by davidsarah, at 2010-05-12T06:04:02Z)

Finished recording patch 'New SFTP implementation: mutable files, read/write support, streaming download, Unicode filena mes, and more

Line 
1
2import os, tempfile, heapq, binascii, traceback, sys
3from stat import S_IFREG, S_IFDIR
4
5from zope.interface import implements
6from twisted.python import components
7from twisted.application import service, strports
8from twisted.conch.ssh import factory, keys, session
9from twisted.conch.ssh.filetransfer import FileTransferServer, SFTPError, \
10     FX_NO_SUCH_FILE, FX_OP_UNSUPPORTED, FX_PERMISSION_DENIED, FX_EOF, \
11     FX_BAD_MESSAGE, FX_FAILURE
12from twisted.conch.ssh.filetransfer import FXF_READ, FXF_WRITE, FXF_APPEND, \
13     FXF_CREAT, FXF_TRUNC, FXF_EXCL
14from twisted.conch.interfaces import ISFTPServer, ISFTPFile, IConchUser
15from twisted.conch.avatar import ConchUser
16from twisted.conch.openssh_compat import primes
17from twisted.cred import portal
18
19from twisted.internet import defer
20from twisted.python.failure import Failure
21from twisted.internet.interfaces import IFinishableConsumer
22from foolscap.api import eventually
23from allmydata.util import deferredutil
24
25from allmydata.util.consumer import download_to_data
26from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
27     NoSuchChildError
28from allmydata.mutable.common import NotWriteableError
29from allmydata.immutable.upload import FileHandle
30
31from pycryptopp.cipher.aes import AES
32
33# twisted.conch.ssh.filetransfer generates this warning, but not when it is imported,
34# only on an error.
35import warnings
36warnings.filterwarnings("ignore", category=DeprecationWarning,
37    message="BaseException.message has been deprecated as of Python 2.6",
38    module=".*filetransfer", append=True)
39
40debug = False
41
42if debug:
43    def eventually_callback(d):
44        s = traceback.format_stack()
45        def _cb(res):
46            try:
47                print "CALLBACK %r %r" % (d, res)
48                d.callback(res)
49            except:  # pragma: no cover
50                print "Failed to callback %r" % (d,)
51                print "with %r" % (res,)
52                print "Original stack:"
53                print '!' + '!'.join(s)
54                traceback.print_exc()
55                raise
56        return lambda res: eventually(_cb, res)
57
58    def eventually_errback(d):
59        s = traceback.format_stack()
60        def _eb(err):
61            try:
62                print "ERRBACK", d, err
63                d.errback(err)
64            except:  # pragma: no cover
65                print "Failed to errback %r" % (d,)
66                print "with %r" % (err,)
67                print "Original stack:"
68                print '!' + '!'.join(s)
69                traceback.print_exc()
70                raise
71        return lambda err: eventually(_eb, err)
72else:
73    def eventually_callback(d):
74        return lambda res: eventually(d.callback, res)
75
76    def eventually_errback(d):
77        return lambda err: eventually(d.errback, err)
78
79
80def _raise_error(err):
81    if err is None:
82        return None
83    if debug:
84        print "TRACEBACK %r" % (err,)
85        #traceback.print_exc(err)
86
87    # The message argument to SFTPError must not reveal information that
88    # might compromise anonymity.
89
90    if err.check(SFTPError):
91        # original raiser of SFTPError has responsibility to ensure anonymity
92        raise err
93    if err.check(NoSuchChildError):
94        childname = err.value.args[0].encode('utf-8')
95        raise SFTPError(FX_NO_SUCH_FILE, childname)
96    if err.check(ExistingChildError):
97        msg = err.value.args[0].encode('utf-8')
98        # later versions of SFTP define FX_FILE_ALREADY_EXISTS, but version 3 doesn't
99        raise SFTPError(FX_PERMISSION_DENIED, msg)
100    if err.check(NotWriteableError):
101        msg = err.value.args[0].encode('utf-8')
102        raise SFTPError(FX_PERMISSION_DENIED, msg)
103    if err.check(NotImplementedError):
104        raise SFTPError(FX_OP_UNSUPPORTED, str(err.value))
105    if err.check(EOFError):
106        raise SFTPError(FX_EOF, "end of file reached")
107    if err.check(defer.FirstError):
108        _raise_error(err.value.subFailure)
109
110    # We assume that the type of error is not anonymity-sensitive.
111    raise SFTPError(FX_FAILURE, str(err.type))
112
113def _repr_flags(flags):
114    return "|".join([f for f in
115                     [(flags & FXF_READ) and "FXF_READ" or None,
116                      (flags & FXF_WRITE) and "FXF_WRITE" or None,
117                      (flags & FXF_APPEND) and "FXF_APPEND" or None,
118                      (flags & FXF_CREAT) and "FXF_CREAT" or None,
119                      (flags & FXF_TRUNC) and "FXF_TRUNC" or None,
120                      (flags & FXF_EXCL) and "FXF_EXCL" or None,
121                     ]
122                     if f])
123
124def _populate_attrs(childnode, metadata, writeable, size=None):
125    attrs = {}
126
127    # see webapi.txt for what these times mean
128    if metadata:
129        if "linkmotime" in metadata.get("tahoe", {}):
130            attrs["mtime"] = int(metadata["tahoe"]["linkmotime"])
131        elif "mtime" in metadata:
132            attrs["mtime"] = int(metadata["mtime"])
133
134        if "linkcrtime" in metadata.get("tahoe", {}):
135            attrs["createtime"] = int(metadata["tahoe"]["linkcrtime"])
136
137        if "ctime" in metadata:
138            attrs["ctime"] = int(metadata["ctime"])
139
140        # We would prefer to omit atime, but SFTP version 3 can only
141        # accept mtime if atime is also set.
142        attrs["atime"] = attrs["mtime"]
143
144    # The permissions must have the extra bits (040000 or 0100000),
145    # otherwise the client will not call openDirectory.
146
147    # Directories and unknown nodes have no size, and SFTP doesn't
148    # require us to make one up.
149    # childnode might be None, meaning that the file doesn't exist yet,
150    # but we're going to write it later.
151
152    if childnode and childnode.is_unknown():
153        perms = 0
154    elif childnode and IDirectoryNode.providedBy(childnode):
155        perms = S_IFDIR | 0770 
156    else:
157        # For files, omit the size if we don't immediately know it.
158        if childnode and size is None:
159            size = childnode.get_size()
160        if size is not None:
161            assert isinstance(size, (int, long)), repr(size)
162            attrs["size"] = size
163        perms = S_IFREG | 0660
164
165    if not writeable:
166        perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
167
168    attrs["permissions"] = perms
169
170    # We could set the SSH_FILEXFER_ATTR_FLAGS here:
171    # ENCRYPTED would always be true ("The file is stored on disk
172    # using file-system level transparent encryption.")
173    # SYSTEM, HIDDEN, ARCHIVE and SYNC would always be false.
174    # READONLY and IMMUTABLE would be set according to
175    # childnode.is_readonly() and childnode.is_immutable()
176    # for known nodes.
177    # However, twisted.conch.ssh.filetransfer only implements
178    # SFTP version 3, which doesn't include these flags.
179
180    return attrs
181
182
183class EncryptedTemporaryFile:
184    # not implemented: next, readline, readlines, xreadlines, writelines
185
186    def _crypt(self, offset, data):
187        # FIXME: use random-access AES (pycryptopp ticket #18)
188        offset_big = offset // 16
189        offset_small = offset % 16
190        iv = binascii.unhexlify("%032x" % offset_big)
191        cipher = AES(self.key, iv=iv)
192        cipher.process("\x00"*offset_small)
193        return cipher.process(data)
194
195    def __init__(self):
196        self.file = tempfile.TemporaryFile()
197        self.key = os.urandom(16)  # AES-128
198
199    def close(self):
200        self.file.close()
201
202    def flush(self):
203        self.file.flush()
204
205    def seek(self, offset, whence=os.SEEK_SET):
206        if debug: print ".seek(%r, %r)" % (offset, whence)
207        self.file.seek(offset, whence)
208
209    def tell(self):
210        offset = self.file.tell()
211        if debug: print ".offset = %r" % (offset,)
212        return offset
213
214    def read(self, size=-1):
215        if debug: print ".read(%r)" % (size,)
216        index = self.file.tell()
217        ciphertext = self.file.read(size)
218        plaintext = self._crypt(index, ciphertext)
219        return plaintext
220
221    def write(self, plaintext):
222        if debug: print ".write(%r)" % (plaintext,)
223        index = self.file.tell()
224        ciphertext = self._crypt(index, plaintext)
225        self.file.write(ciphertext)
226
227    def truncate(self, newsize):
228        if debug: print ".truncate(%r)" % (newsize,)
229        self.file.truncate(newsize)
230
231
232class OverwriteableFileConsumer:
233    implements(IFinishableConsumer)
234    """I act both as a consumer for the download of the original file contents, and as a
235    wrapper for a temporary file that records the downloaded data and any overwrites.
236    I use a priority queue to keep track of which regions of the file have been overwritten
237    but not yet downloaded, so that the download does not clobber overwritten data.
238    I use another priority queue to record milestones at which to make callbacks
239    indicating that a given number of bytes have been downloaded.
240
241    The temporary file reflects the contents of the file that I represent, except that:
242     - regions that have neither been downloaded nor overwritten, if present,
243       contain zeroes.
244     - the temporary file may be shorter than the represented file (it is never longer).
245       The latter's current size is stored in self.current_size.
246
247    This abstraction is mostly independent of SFTP. Consider moving it, if it is found
248    useful for other frontends."""
249
250    def __init__(self, check_abort, download_size, tempfile_maker):
251        self.check_abort = check_abort
252        self.download_size = download_size
253        self.current_size = download_size
254        self.f = tempfile_maker()
255        self.downloaded = 0
256        self.milestones = []  # empty heap of (offset, d)
257        self.overwrites = []  # empty heap of (start, end)
258        self.done = self.when_reached(download_size)  # adds a milestone
259        self.producer = None
260
261    def get_file(self):
262        return self.f
263
264    def get_current_size(self):
265        return self.current_size
266
267    def set_current_size(self, size):
268        if debug: print "set_current_size(%r), current_size = %r, downloaded = %r" % (size, self.current_size, self.downloaded)
269        if size < self.current_size or size < self.downloaded:
270            self.f.truncate(size)
271        self.current_size = size
272        if size < self.download_size:
273            self.download_size = size
274        if self.downloaded >= self.download_size:
275            self.finish()
276
277    def registerProducer(self, p, streaming):
278        self.producer = p
279        if streaming:
280            # call resumeProducing once to start things off
281            p.resumeProducing()
282        else:
283            while not self.done:
284                p.resumeProducing()
285
286    def write(self, data):
287        if debug: print "write(%r)" % (data,)
288        if self.check_abort():
289            self.close()
290            return
291
292        if self.downloaded >= self.download_size:
293            return
294
295        next_downloaded = self.downloaded + len(data)
296        if next_downloaded > self.download_size:
297            data = data[:(self.download_size - self.downloaded)]
298
299        while len(self.overwrites) > 0:
300            (start, end) = self.overwrites[0]
301            if start >= next_downloaded:
302                # This and all remaining overwrites are after the data we just downloaded.
303                break
304            if start > self.downloaded:
305                # The data we just downloaded has been partially overwritten.
306                # Write the prefix of it that precedes the overwritten region.
307                self.f.seek(self.downloaded)
308                self.f.write(data[:(start - self.downloaded)])
309
310            # This merges consecutive overwrites if possible, which allows us to detect the
311            # case where the download can be stopped early because the remaining region
312            # to download has already been fully overwritten.
313            heapq.heappop(self.overwrites)
314            while len(self.overwrites) > 0:
315                (start1, end1) = self.overwrites[0]
316                if start1 > end:
317                    break
318                end = end1
319                heapq.heappop(self.overwrites)
320
321            if end >= next_downloaded:
322                # This overwrite extends past the downloaded data, so there is no
323                # more data to consider on this call.
324                heapq.heappush(self.overwrites, (next_downloaded, end))
325                self._update_downloaded(next_downloaded)
326                return
327            elif end >= self.downloaded:
328                data = data[(end - self.downloaded):]
329                self._update_downloaded(end)
330
331        self.f.seek(self.downloaded)
332        self.f.write(data)
333        self._update_downloaded(next_downloaded)
334
335    def _update_downloaded(self, new_downloaded):
336        self.downloaded = new_downloaded
337        milestone = new_downloaded
338        if len(self.overwrites) > 0:
339            (start, end) = self.overwrites[0]
340            if start <= new_downloaded and end > milestone:
341                milestone = end
342
343        while len(self.milestones) > 0:
344            (next, d) = self.milestones[0]
345            if next > milestone:
346                return
347            if debug: print "MILESTONE %r %r" % (next, d)
348            heapq.heappop(self.milestones)
349            eventually_callback(d)(None)
350
351        if milestone >= self.download_size:
352            self.finish()
353
354    def overwrite(self, offset, data):
355        if debug: print "overwrite(%r, %r)" % (offset, data)
356        if offset > self.download_size and offset > self.current_size:
357            # Normally writing at an offset beyond the current end-of-file
358            # would leave a hole that appears filled with zeroes. However, an
359            # EncryptedTemporaryFile doesn't behave like that (if there is a
360            # hole in the file on disk, the zeroes that are read back will be
361            # XORed with the keystream). So we must explicitly write zeroes in
362            # the gap between the current EOF and the offset.
363
364            self.f.seek(self.current_size)
365            self.f.write("\x00" * (offset - self.current_size))           
366        else:
367            self.f.seek(offset)
368        self.f.write(data)
369        end = offset + len(data)
370        self.current_size = max(self.current_size, end)
371        if end > self.downloaded:
372            heapq.heappush(self.overwrites, (offset, end))
373
374    def read(self, offset, length):
375        """When the data has been read, callback the Deferred that we return with this data.
376        Otherwise errback the Deferred that we return.
377        The caller must perform no more overwrites until the Deferred has fired."""
378
379        if debug: print "read(%r, %r), current_size = %r" % (offset, length, self.current_size)
380        if offset >= self.current_size:
381            def _eof(): raise EOFError("read past end of file")
382            return defer.execute(_eof)
383
384        if offset + length > self.current_size:
385            length = self.current_size - offset
386
387        needed = min(offset + length, self.download_size)
388        d = self.when_reached(needed)
389        def _reached(ign):
390            # It is not necessarily the case that self.downloaded >= needed, because
391            # the file might have been truncated (thus truncating the download) and
392            # then extended.
393
394            assert self.current_size >= offset + length, (self.current_size, offset, length)
395            if debug: print "!!! %r" % (self.f,)
396            self.f.seek(offset)
397            return self.f.read(length)
398        d.addCallback(_reached)
399        return d
400
401    def when_reached(self, index):
402        if debug: print "when_reached(%r)" % (index,)
403        if index <= self.downloaded:  # already reached
404            if debug: print "already reached %r" % (index,)
405            return defer.succeed(None)
406        d = defer.Deferred()
407        def _reached(ign):
408            if debug: print "reached %r" % (index,)
409            return ign
410        d.addCallback(_reached)
411        heapq.heappush(self.milestones, (index, d))
412        return d
413
414    def when_done(self):
415        return self.done
416
417    def finish(self):
418        while len(self.milestones) > 0:
419            (next, d) = self.milestones[0]
420            if debug: print "MILESTONE FINISH %r %r" % (next, d)
421            heapq.heappop(self.milestones)
422            # The callback means that the milestone has been reached if
423            # it is ever going to be. Note that the file may have been
424            # truncated to before the milestone.
425            eventually_callback(d)(None)
426
427        # FIXME: causes spurious failures
428        #self.unregisterProducer()
429
430    def close(self):
431        self.finish()
432        self.f.close()
433
434    def unregisterProducer(self):
435        if self.producer:
436            self.producer.stopProducing()
437            self.producer = None
438
439
440SIZE_THRESHOLD = 1000
441
442def _make_sftp_file(check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
443    if not (flags & (FXF_WRITE | FXF_CREAT)) and (flags & FXF_READ) and filenode and \
444       not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
445        return ShortReadOnlySFTPFile(filenode, metadata)
446    else:
447        return GeneralSFTPFile(check_abort, flags, convergence,
448                               parent=parent, childname=childname, filenode=filenode, metadata=metadata)
449
450
451class ShortReadOnlySFTPFile:
452    implements(ISFTPFile)
453    """I represent a file handle to a particular file on an SFTP connection.
454    I am used only for short immutable files opened in read-only mode.
455    The file contents are downloaded to memory when I am created."""
456
457    def __init__(self, filenode, metadata):
458        assert IFileNode.providedBy(filenode), filenode
459        self.filenode = filenode
460        self.metadata = metadata
461        self.async = download_to_data(filenode)
462        self.closed = False
463
464    def readChunk(self, offset, length):
465        if self.closed:
466            def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
467            return defer.execute(_closed)
468
469        d = defer.Deferred()
470        def _read(data):
471            if debug: print "_read(%r) in readChunk(%r, %r)" % (data, offset, length)
472
473            # "In response to this request, the server will read as many bytes as it
474            #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
475            #  message.  If an error occurs or EOF is encountered before reading any
476            #  data, the server will respond with SSH_FXP_STATUS.  For normal disk
477            #  files, it is guaranteed that this will read the specified number of
478            #  bytes, or up to end of file."
479            #
480            # i.e. we respond with an EOF error iff offset is already at EOF.
481
482            if offset >= len(data):
483                eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
484            else:
485                eventually_callback(d)(data[offset:min(offset+length, len(data))])
486            return data
487        self.async.addCallbacks(_read, eventually_errback(d))
488        return d
489
490    def writeChunk(self, offset, data):
491        def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
492        return defer.execute(_denied)
493
494    def close(self):
495        self.closed = True
496        return defer.succeed(None)
497
498    def getAttrs(self):
499        if debug: print "GETATTRS(file)"
500        if self.closed:
501            def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
502            return defer.execute(_closed)
503
504        return defer.succeed(_populate_attrs(self.filenode, self.metadata, False))
505
506    def setAttrs(self, attrs):
507        if debug: print "SETATTRS(file) %r" % (attrs,)
508        def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
509        return defer.execute(_denied)
510
511
512class GeneralSFTPFile:
513    implements(ISFTPFile)
514    """I represent a file handle to a particular file on an SFTP connection.
515    I wrap an instance of OverwriteableFileConsumer, which is responsible for
516    storing the file contents. In order to allow write requests to be satisfied
517    immediately, there is effectively a FIFO queue between requests made to this
518    file handle, and requests to my OverwriteableFileConsumer. This queue is
519    implemented by the callback chain of self.async."""
520
521    def __init__(self, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
522        self.check_abort = check_abort
523        self.flags = flags
524        self.convergence = convergence
525        self.parent = parent
526        self.childname = childname
527        self.filenode = filenode
528        self.metadata = metadata
529        self.async = defer.succeed(None)
530        self.closed = False
531       
532        # self.consumer should only be relied on in callbacks for self.async, since it might
533        # not be set before then.
534        self.consumer = None
535
536        if (flags & FXF_TRUNC) or not filenode:
537            # We're either truncating or creating the file, so we don't need the old contents.
538            assert flags & FXF_CREAT, flags
539            self.consumer = OverwriteableFileConsumer(self.check_abort, 0,
540                                                      tempfile_maker=EncryptedTemporaryFile)
541            self.consumer.finish()
542        else:
543            assert IFileNode.providedBy(filenode), filenode
544
545            # TODO: use download interface described in #993 when implemented.
546            if filenode.is_mutable():
547                self.async.addCallback(lambda ign: filenode.download_best_version())
548                def _downloaded(data):
549                    self.consumer = OverwriteableFileConsumer(self.check_abort, len(data),
550                                                              tempfile_maker=tempfile.TemporaryFile)
551                    self.consumer.write(data)
552                    self.consumer.finish()
553                    return None
554                self.async.addCallback(_downloaded)
555            else:
556                download_size = filenode.get_size()
557                assert download_size is not None
558                self.consumer = OverwriteableFileConsumer(self.check_abort, download_size,
559                                                          tempfile_maker=tempfile.TemporaryFile)
560                self.async.addCallback(lambda ign: filenode.read(self.consumer, 0, None))
561
562
563    def readChunk(self, offset, length):
564        if not (self.flags & FXF_READ):
565            return defer.fail(SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for reading"))
566
567        if self.closed:
568            def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
569            return defer.execute(_closed)
570
571        d = defer.Deferred()
572        def _read(ign):
573            if debug: print "_read in readChunk(%r, %r)" % (offset, length)
574            d2 = self.consumer.read(offset, length)
575            d2.addErrback(_raise_error)
576            d2.addCallbacks(eventually_callback(d), eventually_errback(d))
577            # It is correct to drop d2 here.
578            return None
579        self.async.addCallbacks(_read, eventually_errback(d))
580        return d
581
582    def writeChunk(self, offset, data):
583        if debug: print "writeChunk(%r, %r)" % (offset, data)
584        if not (self.flags & FXF_WRITE):
585            def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
586            return defer.execute(_denied)
587
588        if self.closed:
589            def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
590            return defer.execute(_closed)
591
592        # Note that we return without waiting for the write to occur. Reads and
593        # close wait for prior writes, and will fail if any prior operation failed.
594        # This is ok because SFTP makes no guarantee that the request completes
595        # before the write. In fact it explicitly allows write errors to be delayed
596        # until close:
597        #   "One should note that on some server platforms even a close can fail.
598        #    This can happen e.g. if the server operating system caches writes,
599        #    and an error occurs while flushing cached writes during the close."
600
601        def _write(ign):
602            # FXF_APPEND means that we should always write at the current end of file.
603            write_offset = offset
604            if self.flags & FXF_APPEND:
605                write_offset = self.consumer.get_current_size()
606
607            self.consumer.overwrite(write_offset, data)
608            return None
609        self.async.addCallback(_write)
610        # don't addErrback to self.async, just allow subsequent async ops to fail.
611        return defer.succeed(None)
612
613    def close(self):
614        if self.closed:
615            return defer.succeed(None)
616
617        # This means that close has been called, not that the close has succeeded.
618        self.closed = True
619
620        if not (self.flags & (FXF_WRITE | FXF_CREAT)):
621            return defer.execute(self.consumer.close)
622
623        def _close(ign):
624            d2 = self.consumer.when_done()
625            if self.filenode and self.filenode.is_mutable():
626                d2.addCallback(lambda ign: self.consumer.get_current_size())
627                d2.addCallback(lambda size: self.consumer.read(0, size))
628                d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
629            else:
630                def _add_file(ign):
631                    u = FileHandle(self.consumer.get_file(), self.convergence)
632                    return self.parent.add_file(self.childname, u)
633                d2.addCallback(_add_file)
634
635            d2.addCallback(lambda ign: self.consumer.close())
636            return d2
637        self.async.addCallback(_close)
638
639        d = defer.Deferred()
640        self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
641        return d
642
643    def getAttrs(self):
644        if debug: print "GETATTRS(file)"
645
646        if self.closed:
647            def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot get attributes for a closed file handle")
648            return defer.execute(_closed)
649
650        # Optimization for read-only handles, when we already know the metadata.
651        if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
652            return defer.succeed(_populate_attrs(self.filenode, self.metadata, False))
653
654        d = defer.Deferred()
655        def _get(ign):
656            # FIXME: pass correct value for writeable
657            # self.filenode might be None, but that's ok.
658            attrs = _populate_attrs(self.filenode, self.metadata, False,
659                                    size=self.consumer.get_current_size())
660            eventually_callback(d)(attrs)
661            return None
662        self.async.addCallbacks(_get, eventually_errback(d))
663        return d
664
665    def setAttrs(self, attrs):
666        if debug: print "SETATTRS(file) %r" % (attrs,)
667        if not (self.flags & FXF_WRITE):
668            def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "file handle was not opened for writing")
669            return defer.execute(_denied)
670
671        if self.closed:
672            def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
673            return defer.execute(_closed)
674
675        if not "size" in attrs:
676            return defer.succeed(None)
677
678        size = attrs["size"]
679        if not isinstance(size, (int, long)) or size < 0:
680            def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
681            return defer.execute(_bad)
682
683        d = defer.Deferred()
684        def _resize(ign):
685            self.consumer.set_current_size(size)
686            eventually_callback(d)(None)
687            return None
688        self.async.addCallbacks(_resize, eventually_errback(d))
689        return d
690
691class SFTPUser(ConchUser):
692    def __init__(self, check_abort, client, rootnode, username, convergence):
693        ConchUser.__init__(self)
694        self.channelLookup["session"] = session.SSHSession
695        self.subsystemLookup["sftp"] = FileTransferServer
696
697        self.check_abort = check_abort
698        self.client = client
699        self.root = rootnode
700        self.username = username
701        self.convergence = convergence
702
703class StoppableList:
704    def __init__(self, items):
705        self.items = items
706    def __iter__(self):
707        for i in self.items:
708            yield i
709    def close(self):
710        pass
711
712import array
713import stat
714
715from time import time, strftime, localtime
716
717def lsLine(name, attrs):
718    st_uid = "tahoe"
719    st_gid = "tahoe"
720    st_mtime = attrs.get("mtime", 0)
721    st_mode = attrs["permissions"]
722    # TODO: check that clients are okay with this being a "?".
723    # (They should be because the longname is intended for human
724    # consumption.)
725    st_size = attrs.get("size", "?")
726    # We don't know how many links there really are to this object.
727    st_nlink = 1
728
729    # From <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
730    # We can't call the version in Twisted because we might have a version earlier than
731    # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
732
733    mode = st_mode
734    perms = array.array('c', '-'*10)
735    ft = stat.S_IFMT(mode)
736    if stat.S_ISDIR(ft): perms[0] = 'd'
737    elif stat.S_ISCHR(ft): perms[0] = 'c'
738    elif stat.S_ISBLK(ft): perms[0] = 'b'
739    elif stat.S_ISREG(ft): perms[0] = '-'
740    elif stat.S_ISFIFO(ft): perms[0] = 'f'
741    elif stat.S_ISLNK(ft): perms[0] = 'l'
742    elif stat.S_ISSOCK(ft): perms[0] = 's'
743    else: perms[0] = '?'
744    # user
745    if mode&stat.S_IRUSR:perms[1] = 'r'
746    if mode&stat.S_IWUSR:perms[2] = 'w'
747    if mode&stat.S_IXUSR:perms[3] = 'x'
748    # group
749    if mode&stat.S_IRGRP:perms[4] = 'r'
750    if mode&stat.S_IWGRP:perms[5] = 'w'
751    if mode&stat.S_IXGRP:perms[6] = 'x'
752    # other
753    if mode&stat.S_IROTH:perms[7] = 'r'
754    if mode&stat.S_IWOTH:perms[8] = 'w'
755    if mode&stat.S_IXOTH:perms[9] = 'x'
756    # suid/sgid never set
757
758    l = perms.tostring()
759    l += str(st_nlink).rjust(5) + ' '
760    un = str(st_uid)
761    l += un.ljust(9)
762    gr = str(st_gid)
763    l += gr.ljust(9)
764    sz = str(st_size)
765    l += sz.rjust(8)
766    l += ' '
767    sixmo = 60 * 60 * 24 * 7 * 26
768    if st_mtime + sixmo < time(): # last edited more than 6mo ago
769        l += strftime("%b %d  %Y ", localtime(st_mtime))
770    else:
771        l += strftime("%b %d %H:%M ", localtime(st_mtime))
772    l += name
773    return l
774
775
776class SFTPHandler:
777    implements(ISFTPServer)
778    def __init__(self, user):
779        if debug: print "Creating SFTPHandler from", user
780        self.check_abort = user.check_abort
781        self.client = user.client
782        self.root = user.root
783        self.username = user.username
784        self.convergence = user.convergence
785
786    def gotVersion(self, otherVersion, extData):
787        if debug: print "GOTVERSION %r %r" % (otherVersion, extData)
788        return {}
789
790    def openFile(self, pathstring, flags, attrs):
791        if debug: print "OPENFILE %r %r %r %r" % (pathstring, flags, _repr_flags(flags), attrs)
792        # this is used for both reading and writing.
793
794        # First exclude invalid combinations of flags.
795
796        # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
797        # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
798        # existing file gives the same.
799
800        if not (flags & (FXF_READ | FXF_WRITE)):
801            raise SFTPError(FX_BAD_MESSAGE,
802                            "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
803
804        if not (flags & FXF_CREAT):
805            if flags & FXF_TRUNC:
806                raise SFTPError(FX_BAD_MESSAGE,
807                                "invalid file open flags: FXF_TRUNC cannot be set without FXF_CREAT")
808            if flags & FXF_EXCL:
809                raise SFTPError(FX_BAD_MESSAGE,
810                                "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
811
812        path = self._path_from_string(pathstring)
813        if not path:
814            raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
815
816        # The combination of flags is potentially valid. Now there are two major cases:
817        #
818        #  1. The path is specified as /uri/FILECAP, with no parent directory.
819        #     If the FILECAP is mutable and writeable, then we can open it in write-only
820        #     or read/write mode (non-exclusively), otherwise we can only open it in
821        #     read-only mode. The open should succeed immediately as long as FILECAP is
822        #     a valid known filecap that grants the required permission.
823        #
824        #  2. The path is specified relative to a parent. We find the parent dirnode and
825        #     get the child's URI and metadata if it exists. There are four subcases:
826        #       a. the child does not exist: FXF_CREAT must be set, and we must be able
827        #          to write to the parent directory.
828        #       b. the child exists but is not a valid known filecap: fail
829        #       c. the child is mutable: if we are trying to open it write-only or
830        #          read/write, then we must be able to write to the file.
831        #       d. the child is immutable: if we are trying to open it write-only or
832        #          read/write, then we must be able to write to the parent directory.
833        #
834        # To reduce latency, open succeeds as soon as these conditions are met, even
835        # though there might be a failure in downloading the existing file or uploading
836        # a new one.
837        #
838        # Note that the permission checks below are for more precise error reporting on
839        # the open call; later operations would fail even if we did not make these checks.
840
841        stash = {'parent': None}
842        d = self._get_root(path)
843        def _got_root((root, path)):
844            if root.is_unknown():
845                raise SFTPError(FX_PERMISSION_DENIED,
846                                "cannot open an unknown cap (or child of an unknown directory). "
847                                "Upgrading the gateway to a later Tahoe-LAFS version may help")
848            if not path:
849                # case 1
850                if not IFileNode.providedBy(root):
851                    raise SFTPError(FX_PERMISSION_DENIED,
852                                    "cannot open a directory cap")
853                if (flags & FXF_WRITE) and root.is_readonly():
854                    raise SFTPError(FX_PERMISSION_DENIED,
855                                    "cannot write to a non-writeable filecap without a parent directory")
856                if flags & FXF_EXCL:
857                    raise SFTPError(FX_PERMISSION_DENIED,
858                                    "cannot create a file exclusively when it already exists")
859
860                return _make_sftp_file(self.check_abort, flags, self.convergence, filenode=root)
861            else:
862                # case 2
863                childname = path[-1]
864                if debug: print "case 2: childname = %r, path[:-1] = %r" % (childname, path[:-1])
865                d2 = root.get_child_at_path(path[:-1])
866                def _got_parent(parent):
867                    if debug: print "_got_parent(%r)" % (parent,)
868                    stash['parent'] = parent
869
870                    if flags & FXF_EXCL:
871                        # FXF_EXCL means that the link to the file (not the file itself) must
872                        # be created atomically wrt updates by this storage client.
873                        # That is, we need to create the link before returning success to the
874                        # SFTP open request (and not just on close, as would normally be the
875                        # case). We make the link initially point to a zero-length LIT file,
876                        # which is consistent with what might happen on a POSIX filesystem.
877
878                        if parent.is_readonly():
879                            raise SFTPError(FX_PERMISSION_DENIED,
880                                            "cannot create a file exclusively when the parent directory is read-only")
881
882                        # 'overwrite=False' ensures failure if the link already exists.
883                        # FIXME: should use a single call to set_uri and return (child, metadata) (#1035)
884                        d3 = parent.set_uri(childname, None, "URI:LIT:", overwrite=False)
885                        def _seturi_done(child):
886                            stash['child'] = child
887                            return parent.get_metadata_for(childname)
888                        d3.addCallback(_seturi_done)
889                        d3.addCallback(lambda metadata: (stash['child'], metadata))
890                        return d3
891                    else:
892                        if debug: print "get_child_and_metadata"
893                        return parent.get_child_and_metadata(childname)
894                d2.addCallback(_got_parent)
895
896                def _got_child( (filenode, metadata) ):
897                    if debug: print "_got_child((%r, %r))" % (filenode, metadata)
898                    parent = stash['parent']
899                    if filenode.is_unknown():
900                        raise SFTPError(FX_PERMISSION_DENIED,
901                                        "cannot open an unknown cap. Upgrading the gateway "
902                                        "to a later Tahoe-LAFS version may help")
903                    if not IFileNode.providedBy(filenode):
904                        raise SFTPError(FX_PERMISSION_DENIED,
905                                        "cannot open a directory as if it were a file")
906                    if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
907                        raise SFTPError(FX_PERMISSION_DENIED,
908                                        "cannot open a read-only mutable file for writing")
909                    if (flags & FXF_WRITE) and parent.is_readonly():
910                        raise SFTPError(FX_PERMISSION_DENIED,
911                                        "cannot open a file for writing when the parent directory is read-only")
912
913                    return _make_sftp_file(self.check_abort, flags, self.convergence, parent=parent,
914                                           childname=childname, filenode=filenode, metadata=metadata)
915                def _no_child(f):
916                    if debug: print "_no_child(%r)" % (f,)
917                    f.trap(NoSuchChildError)
918                    parent = stash['parent']
919                    if parent is None:
920                        return f
921                    if not (flags & FXF_CREAT):
922                        raise SFTPError(FX_NO_SUCH_FILE,
923                                        "the file does not exist, and was not opened with the creation (CREAT) flag")
924                    if parent.is_readonly():
925                        raise SFTPError(FX_PERMISSION_DENIED,
926                                        "cannot create a file when the parent directory is read-only")
927
928                    return _make_sftp_file(self.check_abort, flags, self.convergence, parent=parent,
929                                           childname=childname)
930                d2.addCallbacks(_got_child, _no_child)
931                return d2
932        d.addCallback(_got_root)
933        d.addErrback(_raise_error)
934        return d
935
936    def removeFile(self, pathstring):
937        if debug: print "REMOVEFILE %r" % (pathstring,)
938        path = self._path_from_string(pathstring)
939        return self._remove_object(path, must_be_file=True)
940
941    def renameFile(self, oldpathstring, newpathstring):
942        if debug: print "RENAMEFILE %r %r" % (oldpathstring, newpathstring)
943        fromPath = self._path_from_string(oldpathstring)
944        toPath = self._path_from_string(newpathstring)
945
946        # the target directory must already exist
947        d = deferredutil.gatherResults([self._get_parent(fromPath),
948                                        self._get_parent(toPath)])
949        def _got( (fromPair, toPair) ):
950            (fromParent, fromChildname) = fromPair
951            (toParent, toChildname) = toPair
952
953            # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
954            # "It is an error if there already exists a file with the name specified
955            #  by newpath."
956            # FIXME: use move_child_to_path to avoid possible data loss due to #943
957            d = fromParent.move_child_to(fromChildname, toParent, toChildname, overwrite=False)
958            #d = parent.move_child_to_path(fromChildname, toRoot, toPath[:-1],
959            #                              toPath[-1], overwrite=False)
960            return d
961        d.addCallback(_got)
962        d.addErrback(_raise_error)
963        return d
964
965    def makeDirectory(self, pathstring, attrs):
966        if debug: print "MAKEDIRECTORY %r %r" % (pathstring, attrs)
967        path = self._path_from_string(pathstring)
968        metadata = self._attrs_to_metadata(attrs)
969        d = self._get_root(path)
970        d.addCallback(lambda (root,path):
971                      self._get_or_create_directories(root, path, metadata))
972        d.addErrback(_raise_error)
973        return d
974
975    def _get_or_create_directories(self, node, path, metadata):
976        if not IDirectoryNode.providedBy(node):
977            # unfortunately it is too late to provide the name of the
978            # blocking file in the error message.
979            raise SFTPError(FX_PERMISSION_DENIED,
980                            "cannot create directory because there "
981                            "is a file in the way") # close enough
982        if not path:
983            return defer.succeed(node)
984        d = node.get(path[0])
985        def _maybe_create(f):
986            f.trap(NoSuchChildError)
987            return node.create_subdirectory(path[0])
988        d.addErrback(_maybe_create)
989        d.addCallback(self._get_or_create_directories, path[1:], metadata)
990        d.addErrback(_raise_error)
991        return d
992
993    def removeDirectory(self, pathstring):
994        if debug: print "REMOVEDIRECTORY %r" % (pathstring,)
995        path = self._path_from_string(pathstring)
996        return self._remove_object(path, must_be_directory=True)
997
998    def _remove_object(self, path, must_be_directory=False, must_be_file=False):
999        d = defer.maybeDeferred(self._get_parent, path)
1000        def _got_parent( (parent, childname) ):
1001            d2 = parent.get(childname)
1002            def _got_child(child):
1003                # Unknown children can be removed by either removeFile or removeDirectory.
1004                if must_be_directory and IFileNode.providedBy(child):
1005                    raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
1006                if must_be_file and IDirectoryNode.providedBy(child):
1007                    raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
1008                return parent.delete(childname)
1009            d2.addCallback(_got_child)
1010            return d2
1011        d.addCallback(_got_parent)
1012        d.addErrback(_raise_error)
1013        return d
1014
1015    def openDirectory(self, pathstring):
1016        if debug: print "OPENDIRECTORY %r" % (pathstring,)
1017        path = self._path_from_string(pathstring)
1018        if debug: print " PATH %r" % (path,)
1019        d = self._get_node_and_metadata_for_path(path)
1020        def _list( (dirnode, metadata) ):
1021            if dirnode.is_unknown():
1022                raise SFTPError(FX_PERMISSION_DENIED,
1023                                "cannot list an unknown cap as a directory. Upgrading the gateway "
1024                                "to a later Tahoe-LAFS version may help")
1025            if not IDirectoryNode.providedBy(dirnode):
1026                raise SFTPError(FX_PERMISSION_DENIED,
1027                                "cannot list a file as if it were a directory")
1028            d2 = dirnode.list()
1029            def _render(children):
1030                parent_writeable = not dirnode.is_readonly()
1031                results = []
1032                for filename, (node, metadata) in children.iteritems():
1033                    # The file size may be cached or absent.
1034                    writeable = parent_writeable and (node.is_unknown() or
1035                                                      not (node.is_mutable() and node.is_readonly()))
1036                    attrs = _populate_attrs(node, metadata, writeable)
1037                    filename_utf8 = filename.encode('utf-8')
1038                    longname = lsLine(filename_utf8, attrs)
1039                    results.append( (filename_utf8, longname, attrs) )
1040                return StoppableList(results)
1041            d2.addCallback(_render)
1042            return d2
1043        d.addCallback(_list)
1044        d.addErrback(_raise_error)
1045        return d
1046
1047    def getAttrs(self, pathstring, followLinks):
1048        if debug: print "GETATTRS %r %r" % (pathstring, followLinks)
1049        d = self._get_node_and_metadata_for_path(self._path_from_string(pathstring))
1050        def _render( (node, metadata) ):
1051            # When asked about a specific file, report its current size.
1052            # TODO: the modification time for a mutable file should be
1053            # reported as the update time of the best version. But that
1054            # information isn't currently stored in mutable shares, I think.
1055            d2 = node.get_current_size()
1056            def _got_size(size):
1057                # FIXME: pass correct value for writeable
1058                attrs = _populate_attrs(node, metadata, False, size=size)
1059                return attrs
1060            d2.addCallback(_got_size)
1061            return d2
1062        d.addCallback(_render)
1063        d.addErrback(_raise_error)
1064        def _done(res):
1065            if debug: print " DONE %r" % (res,)
1066            return res
1067        d.addBoth(_done)
1068        return d
1069
1070    def setAttrs(self, pathstring, attrs):
1071        if debug: print "SETATTRS %r %r" % (pathstring, attrs)
1072        if "size" in attrs:
1073            # this would require us to download and re-upload the truncated/extended
1074            # file contents
1075            raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute")
1076        return None
1077
1078    def readLink(self, pathstring):
1079        if debug: print "READLINK %r" % (pathstring,)
1080        raise SFTPError(FX_OP_UNSUPPORTED, "readLink")
1081
1082    def makeLink(self, linkPathstring, targetPathstring):
1083        if debug: print "MAKELINK %r %r" % (linkPathstring, targetPathstring)
1084        raise SFTPError(FX_OP_UNSUPPORTED, "makeLink")
1085
1086    def extendedRequest(self, extendedName, extendedData):
1087        if debug: print "EXTENDEDREQUEST %r %r" % (extendedName, extendedData)
1088        # Client 'df' command requires 'statvfs@openssh.com' extension
1089        # (but there's little point to implementing that since we would only
1090        # have faked values to report).
1091        raise SFTPError(FX_OP_UNSUPPORTED, "extendedRequest %r" % extendedName)
1092
1093    def realPath(self, pathstring):
1094        if debug: print "REALPATH %r" % (pathstring,)
1095        return "/" + "/".join(self._path_from_string(pathstring))
1096
1097    def _path_from_string(self, pathstring):
1098        if debug: print "CONVERT %r" % (pathstring,)
1099
1100        # The home directory is the root directory.
1101        pathstring = pathstring.strip("/")
1102        if pathstring == "" or pathstring == ".":
1103            path_utf8 = []
1104        else:
1105            path_utf8 = pathstring.split("/")
1106
1107        # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.2>
1108        # "Servers SHOULD interpret a path name component ".." as referring to
1109        #  the parent directory, and "." as referring to the current directory."
1110        path = []
1111        for p_utf8 in path_utf8:
1112            if p_utf8 == "..":
1113                # ignore excess .. components at the root
1114                if len(path) > 0:
1115                    path = path[:-1]
1116            elif p_utf8 != ".":
1117                try:
1118                    p = p_utf8.decode('utf-8', 'strict')
1119                except UnicodeError:
1120                    raise SFTPError(FX_NO_SUCH_FILE, "path could not be decoded as UTF-8")
1121                path.append(p)
1122
1123        if debug: print " PATH %r" % (path,)
1124        return path
1125
1126    def _get_node_and_metadata_for_path(self, path):
1127        d = self._get_root(path)
1128        def _got_root( (root, path) ):
1129            if debug: print " ROOT %r" % (root,)
1130            if debug: print " PATH %r" % (path,)
1131            if path:
1132                return root.get_child_and_metadata_at_path(path)
1133            else:
1134                return (root,{})
1135        d.addCallback(_got_root)
1136        return d
1137
1138    def _get_root(self, path):
1139        # return (root, remaining_path)
1140        if path and path[0] == u"uri":
1141            d = defer.maybeDeferred(self.client.create_node_from_uri,
1142                                    str(path[1]))
1143            d.addCallback(lambda root: (root, path[2:]))
1144        else:
1145            d = defer.succeed((self.root,path))
1146        return d
1147
1148    def _get_parent(self, path):
1149        # fire with (parentnode, childname)
1150        if not path:
1151            def _nosuch(): raise SFTPError(FX_NO_SUCH_FILE, "path does not exist")
1152            return defer.execute(_nosuch)
1153
1154        childname = path[-1]
1155        assert isinstance(childname, unicode), repr(childname)
1156        d = self._get_root(path)
1157        def _got_root( (root, path) ):
1158            if not path:
1159                raise SFTPError(FX_NO_SUCH_FILE, "path does not exist")
1160            return root.get_child_at_path(path[:-1])
1161        d.addCallback(_got_root)
1162        def _got_parent(parent):
1163            return (parent, childname)
1164        d.addCallback(_got_parent)
1165        return d
1166
1167    def _attrs_to_metadata(self, attrs):
1168        metadata = {}
1169
1170        for key in attrs:
1171            if key == "mtime" or key == "ctime" or key == "createtime":
1172                metadata[key] = long(attrs[key])
1173            elif key.startswith("ext_"):
1174                metadata[key] = str(attrs[key])
1175
1176        return metadata
1177
1178
1179# if you have an SFTPUser, and you want something that provides ISFTPServer,
1180# then you get SFTPHandler(user)
1181components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
1182
1183from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
1184
1185class Dispatcher:
1186    implements(portal.IRealm)
1187    def __init__(self, client):
1188        self.client = client
1189
1190    def requestAvatar(self, avatarID, mind, interface):
1191        assert interface == IConchUser
1192        rootnode = self.client.create_node_from_uri(avatarID.rootcap)
1193        convergence = self.client.convergence
1194        logged_out = {'flag': False}
1195        def check_abort():
1196            return logged_out['flag']
1197        def logout():
1198            logged_out['flag'] = True
1199        s = SFTPUser(check_abort, self.client, rootnode, avatarID.username, convergence)
1200        return (interface, s, logout)
1201
1202class SFTPServer(service.MultiService):
1203    def __init__(self, client, accountfile, accounturl,
1204                 sftp_portstr, pubkey_file, privkey_file):
1205        service.MultiService.__init__(self)
1206
1207        r = Dispatcher(client)
1208        p = portal.Portal(r)
1209
1210        if accountfile:
1211            c = AccountFileChecker(self, accountfile)
1212            p.registerChecker(c)
1213        if accounturl:
1214            c = AccountURLChecker(self, accounturl)
1215            p.registerChecker(c)
1216        if not accountfile and not accounturl:
1217            # we could leave this anonymous, with just the /uri/CAP form
1218            raise NeedRootcapLookupScheme("must provide some translation")
1219
1220        pubkey = keys.Key.fromFile(pubkey_file)
1221        privkey = keys.Key.fromFile(privkey_file)
1222        class SSHFactory(factory.SSHFactory):
1223            publicKeys = {pubkey.sshType(): pubkey}
1224            privateKeys = {privkey.sshType(): privkey}
1225            def getPrimes(self):
1226                try:
1227                    # if present, this enables diffie-hellman-group-exchange
1228                    return primes.parseModuliFile("/etc/ssh/moduli")
1229                except IOError:
1230                    return None
1231
1232        f = SSHFactory()
1233        f.portal = p
1234
1235        s = strports.service(sftp_portstr, f)
1236        s.setServiceParent(self)
1237