Update
Тестирование ядра / Tester (push) Failing after 3s

This commit is contained in:
2026-06-20 00:40:03 +03:00
parent 45883e1b95
commit 74595b7a4a
2 changed files with 17 additions and 3 deletions
+3 -2
View File
@@ -1,7 +1,7 @@
__author__ = 'RemiZOffAlex'
__email__ = 'remizoffalex@mail.ru'
from io import BufferedReader
from io import IOBase
BUFF_SIZE = 1024
CRLF = b'\r\n'
@@ -74,6 +74,7 @@ def stream_next(state):
state.position = 'headers'
return raw
if state.position == 'headers':
raw = handler_headers(response)
state.position = 'body'
return raw
if state.position == 'body':
@@ -98,7 +99,7 @@ class ResponseStream:
response = self.response
if isinstance(response.data, str):
self.handler = str_next
elif isinstance(response.data, BufferedReader):
elif isinstance(response.data, IOBase):
self.handler = stream_next
elif isinstance(response.data, bytes):
self.handler = bytes_next
+14 -1
View File
@@ -3,12 +3,25 @@ __email__ = 'remizoffalex@mail.ru'
import unittest
from io import BytesIO
from nucleus.response.common import Response
from nucleus.response.stream import ResponseStream
class TestResponse(unittest.TestCase):
def test_response(self):
def test_response_str(self):
response = Response('ok')
for item in ResponseStream(response):
print(item)
def test_response_bytes(self):
response = Response(b'ok')
for item in ResponseStream(response):
print(item)
def test_response_stream(self):
file = BytesIO(b'ok')
response = Response(file)
for item in ResponseStream(response):
print(item)