django-downloadview/django_downloadview/io.py
2024-08-05 10:53:19 +02:00

143 lines
3.8 KiB
Python

"""Low-level IO operations, for use with file wrappers."""
import io
from django.utils.encoding import force_bytes, force_str
class TextIteratorIO(io.TextIOBase):
"""A dynamically generated TextIO-like object.
Original code by Matt Joiner <anacrolix@gmail.com> from:
* http://stackoverflow.com/questions/12593576/
* https://gist.github.com/anacrolix/3788413
"""
def __init__(self, iterator):
#: Iterator/generator for content.
self._iter = iterator
#: Internal buffer.
self._left = ""
def readable(self):
return True
def _read1(self, n=None):
while not self._left:
try:
self._left = next(self._iter)
except StopIteration:
break
else:
# Make sure we handle text.
self._left = force_str(self._left)
ret = self._left[:n]
self._left = self._left[len(ret) :]
return ret
def read(self, n=None):
"""Return content up to ``n`` length."""
chunks = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
chunks.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
chunks.append(m)
return "".join(chunks)
def readline(self):
chunks = []
while True:
i = self._left.find("\n")
if i == -1:
chunks.append(self._left)
try:
self._left = next(self._iter)
except StopIteration:
self._left = ""
break
else:
chunks.append(self._left[: i + 1])
self._left = self._left[i + 1 :]
break
return "".join(chunks)
class BytesIteratorIO(io.BytesIO):
"""A dynamically generated BytesIO-like object.
Original code by Matt Joiner <anacrolix@gmail.com> from:
* http://stackoverflow.com/questions/12593576/
* https://gist.github.com/anacrolix/3788413
"""
def __init__(self, iterator):
#: Iterator/generator for content.
self._iter = iterator
#: Internal buffer.
self._left = b""
def readable(self):
return True
def _read1(self, n=None):
while not self._left:
try:
self._left = next(self._iter)
except StopIteration:
break
else:
# Make sure we handle text.
self._left = force_bytes(self._left)
ret = self._left[:n]
self._left = self._left[len(ret) :]
return ret
def read(self, n=None):
"""Return content up to ``n`` length."""
chunks = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
chunks.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
chunks.append(m)
return b"".join(chunks)
def readline(self):
chunks = []
while True:
i = self._left.find(b"\n")
if i == -1:
chunks.append(self._left)
try:
self._left = next(self._iter)
except StopIteration:
self._left = b""
break
else:
chunks.append(self._left[: i + 1])
self._left = self._left[i + 1 :]
break
return b"".join(chunks)