| 1 | """ |
|---|
| 2 | Tests for allmydata.util.consumer. |
|---|
| 3 | |
|---|
| 4 | Ported to Python 3. |
|---|
| 5 | """ |
|---|
| 6 | |
|---|
| 7 | from zope.interface import implementer |
|---|
| 8 | from twisted.internet.interfaces import IPushProducer, IPullProducer |
|---|
| 9 | |
|---|
| 10 | from allmydata.util.consumer import MemoryConsumer |
|---|
| 11 | |
|---|
| 12 | from .common import ( |
|---|
| 13 | SyncTestCase, |
|---|
| 14 | ) |
|---|
| 15 | from testtools.matchers import ( |
|---|
| 16 | Equals, |
|---|
| 17 | ) |
|---|
| 18 | |
|---|
| 19 | |
|---|
| 20 | @implementer(IPushProducer) |
|---|
| 21 | @implementer(IPullProducer) |
|---|
| 22 | class Producer: |
|---|
| 23 | """Can be used as either streaming or non-streaming producer. |
|---|
| 24 | |
|---|
| 25 | If used as streaming, the test should call iterate() manually. |
|---|
| 26 | """ |
|---|
| 27 | |
|---|
| 28 | def __init__(self, consumer, data): |
|---|
| 29 | self.data = data |
|---|
| 30 | self.consumer = consumer |
|---|
| 31 | self.done = False |
|---|
| 32 | |
|---|
| 33 | def stopProducing(self): |
|---|
| 34 | pass |
|---|
| 35 | |
|---|
| 36 | def pauseProducing(self): |
|---|
| 37 | pass |
|---|
| 38 | |
|---|
| 39 | def resumeProducing(self): |
|---|
| 40 | """Kick off streaming.""" |
|---|
| 41 | self.iterate() |
|---|
| 42 | |
|---|
| 43 | def iterate(self): |
|---|
| 44 | """Do another iteration of writing.""" |
|---|
| 45 | if self.done: |
|---|
| 46 | raise RuntimeError( |
|---|
| 47 | "There's a bug somewhere, shouldn't iterate after being done" |
|---|
| 48 | ) |
|---|
| 49 | if self.data: |
|---|
| 50 | self.consumer.write(self.data.pop(0)) |
|---|
| 51 | else: |
|---|
| 52 | self.done = True |
|---|
| 53 | self.consumer.unregisterProducer() |
|---|
| 54 | |
|---|
| 55 | |
|---|
| 56 | class MemoryConsumerTests(SyncTestCase): |
|---|
| 57 | """Tests for MemoryConsumer.""" |
|---|
| 58 | |
|---|
| 59 | def test_push_producer(self): |
|---|
| 60 | """ |
|---|
| 61 | A MemoryConsumer accumulates all data sent by a streaming producer. |
|---|
| 62 | """ |
|---|
| 63 | consumer = MemoryConsumer() |
|---|
| 64 | producer = Producer(consumer, [b"abc", b"def", b"ghi"]) |
|---|
| 65 | consumer.registerProducer(producer, True) |
|---|
| 66 | self.assertThat(consumer.chunks, Equals([b"abc"])) |
|---|
| 67 | producer.iterate() |
|---|
| 68 | producer.iterate() |
|---|
| 69 | self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"])) |
|---|
| 70 | self.assertFalse(consumer.done) |
|---|
| 71 | producer.iterate() |
|---|
| 72 | self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"])) |
|---|
| 73 | self.assertTrue(consumer.done) |
|---|
| 74 | |
|---|
| 75 | def test_pull_producer(self): |
|---|
| 76 | """ |
|---|
| 77 | A MemoryConsumer accumulates all data sent by a non-streaming producer. |
|---|
| 78 | """ |
|---|
| 79 | consumer = MemoryConsumer() |
|---|
| 80 | producer = Producer(consumer, [b"abc", b"def", b"ghi"]) |
|---|
| 81 | consumer.registerProducer(producer, False) |
|---|
| 82 | self.assertThat(consumer.chunks, Equals([b"abc", b"def", b"ghi"])) |
|---|
| 83 | self.assertTrue(consumer.done) |
|---|
| 84 | |
|---|
| 85 | |
|---|
| 86 | # download_to_data() is effectively tested by some of the filenode tests, e.g. |
|---|
| 87 | # test_immutable.py. |
|---|