| 1 | """ |
|---|
| 2 | Ported to Python 3. |
|---|
| 3 | """ |
|---|
| 4 | |
|---|
| 5 | import os |
|---|
| 6 | from io import BytesIO |
|---|
| 7 | from ..common import SyncTestCase |
|---|
| 8 | from allmydata.mutable.publish import MutableFileHandle |
|---|
| 9 | |
|---|
| 10 | |
|---|
| 11 | class FileHandle(SyncTestCase): |
|---|
| 12 | def setUp(self): |
|---|
| 13 | super(FileHandle, self).setUp() |
|---|
| 14 | self.test_data = b"Test Data" * 50000 |
|---|
| 15 | self.sio = BytesIO(self.test_data) |
|---|
| 16 | self.uploadable = MutableFileHandle(self.sio) |
|---|
| 17 | |
|---|
| 18 | |
|---|
| 19 | def test_filehandle_read(self): |
|---|
| 20 | self.basedir = "mutable/FileHandle/test_filehandle_read" |
|---|
| 21 | chunk_size = 10 |
|---|
| 22 | for i in range(0, len(self.test_data), chunk_size): |
|---|
| 23 | data = self.uploadable.read(chunk_size) |
|---|
| 24 | data = b"".join(data) |
|---|
| 25 | start = i |
|---|
| 26 | end = i + chunk_size |
|---|
| 27 | self.failUnlessEqual(data, self.test_data[start:end]) |
|---|
| 28 | |
|---|
| 29 | |
|---|
| 30 | def test_filehandle_get_size(self): |
|---|
| 31 | self.basedir = "mutable/FileHandle/test_filehandle_get_size" |
|---|
| 32 | actual_size = len(self.test_data) |
|---|
| 33 | size = self.uploadable.get_size() |
|---|
| 34 | self.failUnlessEqual(size, actual_size) |
|---|
| 35 | |
|---|
| 36 | |
|---|
| 37 | def test_filehandle_get_size_out_of_order(self): |
|---|
| 38 | # We should be able to call get_size whenever we want without |
|---|
| 39 | # disturbing the location of the seek pointer. |
|---|
| 40 | chunk_size = 100 |
|---|
| 41 | data = self.uploadable.read(chunk_size) |
|---|
| 42 | self.failUnlessEqual(b"".join(data), self.test_data[:chunk_size]) |
|---|
| 43 | |
|---|
| 44 | # Now get the size. |
|---|
| 45 | size = self.uploadable.get_size() |
|---|
| 46 | self.failUnlessEqual(size, len(self.test_data)) |
|---|
| 47 | |
|---|
| 48 | # Now get more data. We should be right where we left off. |
|---|
| 49 | more_data = self.uploadable.read(chunk_size) |
|---|
| 50 | start = chunk_size |
|---|
| 51 | end = chunk_size * 2 |
|---|
| 52 | self.failUnlessEqual(b"".join(more_data), self.test_data[start:end]) |
|---|
| 53 | |
|---|
| 54 | |
|---|
| 55 | def test_filehandle_file(self): |
|---|
| 56 | # Make sure that the MutableFileHandle works on a file as well |
|---|
| 57 | # as a BytesIO object, since in some cases it will be asked to |
|---|
| 58 | # deal with files. |
|---|
| 59 | self.basedir = self.mktemp() |
|---|
| 60 | # necessary? What am I doing wrong here? |
|---|
| 61 | os.mkdir(self.basedir) |
|---|
| 62 | f_path = os.path.join(self.basedir, "test_file") |
|---|
| 63 | f = open(f_path, "wb") |
|---|
| 64 | f.write(self.test_data) |
|---|
| 65 | f.close() |
|---|
| 66 | f = open(f_path, "rb") |
|---|
| 67 | |
|---|
| 68 | uploadable = MutableFileHandle(f) |
|---|
| 69 | |
|---|
| 70 | data = uploadable.read(len(self.test_data)) |
|---|
| 71 | self.failUnlessEqual(b"".join(data), self.test_data) |
|---|
| 72 | size = uploadable.get_size() |
|---|
| 73 | self.failUnlessEqual(size, len(self.test_data)) |
|---|
| 74 | |
|---|
| 75 | |
|---|
| 76 | def test_close(self): |
|---|
| 77 | # Make sure that the MutableFileHandle closes its handle when |
|---|
| 78 | # told to do so. |
|---|
| 79 | self.uploadable.close() |
|---|
| 80 | self.failUnless(self.sio.closed) |
|---|