| 1 | """ |
|---|
| 2 | Tests for allmydata.util.netstring. |
|---|
| 3 | |
|---|
| 4 | Ported to Python 3. |
|---|
| 5 | """ |
|---|
| 6 | |
|---|
| 7 | from twisted.trial import unittest |
|---|
| 8 | |
|---|
| 9 | from allmydata.util.netstring import netstring, split_netstring |
|---|
| 10 | |
|---|
| 11 | |
|---|
| 12 | class Netstring(unittest.TestCase): |
|---|
| 13 | def test_encode(self): |
|---|
| 14 | """netstring() correctly encodes the given bytes.""" |
|---|
| 15 | result = netstring(b"abc") |
|---|
| 16 | self.assertEqual(result, b"3:abc,") |
|---|
| 17 | self.assertIsInstance(result, bytes) |
|---|
| 18 | |
|---|
| 19 | def test_split(self): |
|---|
| 20 | a = netstring(b"hello") + netstring(b"world") |
|---|
| 21 | for s in split_netstring(a, 2)[0]: |
|---|
| 22 | self.assertIsInstance(s, bytes) |
|---|
| 23 | self.failUnlessEqual(split_netstring(a, 2), ([b"hello", b"world"], len(a))) |
|---|
| 24 | self.failUnlessEqual(split_netstring(a, 2, required_trailer=b""), ([b"hello", b"world"], len(a))) |
|---|
| 25 | self.failUnlessRaises(ValueError, split_netstring, a, 3) |
|---|
| 26 | self.failUnlessRaises(ValueError, split_netstring, a+b" extra", 2, required_trailer=b"") |
|---|
| 27 | self.failUnlessEqual(split_netstring(a+b" extra", 2), ([b"hello", b"world"], len(a))) |
|---|
| 28 | self.failUnlessEqual(split_netstring(a+b"++", 2, required_trailer=b"++"), |
|---|
| 29 | ([b"hello", b"world"], len(a)+2)) |
|---|
| 30 | self.failUnlessRaises(ValueError, |
|---|
| 31 | split_netstring, a+b"+", 2, required_trailer=b"not") |
|---|
| 32 | |
|---|
| 33 | def test_extra(self): |
|---|
| 34 | a = netstring(b"hello") |
|---|
| 35 | self.failUnlessEqual(split_netstring(a, 1), ([b"hello"], len(a))) |
|---|
| 36 | b = netstring(b"hello") + b"extra stuff" |
|---|
| 37 | self.failUnlessEqual(split_netstring(b, 1), |
|---|
| 38 | ([b"hello"], len(a))) |
|---|
| 39 | |
|---|
| 40 | def test_nested(self): |
|---|
| 41 | a = netstring(b"hello") + netstring(b"world") + b"extra stuff" |
|---|
| 42 | b = netstring(b"a") + netstring(b"is") + netstring(a) + netstring(b".") |
|---|
| 43 | (top, pos) = split_netstring(b, 4) |
|---|
| 44 | self.failUnlessEqual(len(top), 4) |
|---|
| 45 | self.failUnlessEqual(top[0], b"a") |
|---|
| 46 | self.failUnlessEqual(top[1], b"is") |
|---|
| 47 | self.failUnlessEqual(top[2], a) |
|---|
| 48 | self.failUnlessEqual(top[3], b".") |
|---|
| 49 | self.failUnlessRaises(ValueError, split_netstring, a, 2, required_trailer=b"") |
|---|
| 50 | bottom = split_netstring(a, 2) |
|---|
| 51 | self.failUnlessEqual(bottom, ([b"hello", b"world"], len(netstring(b"hello")+netstring(b"world")))) |
|---|