Skip to content

Commit 246f77b

Browse files
committed
Add RawSource.asFlow transformer
- Introduced converter from `RawSource` to Kotlin Flows. - Added `StreamingDecoder` interface and its implementation `DelimitingByteStreamDecoder` for processing byte streams with a specified delimiter. - Added tests for decoder, and flow behavior cases.
1 parent b4e59a4 commit 246f77b

File tree

8 files changed

+1014
-2
lines changed

8 files changed

+1014
-2
lines changed

core/build.gradle.kts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ kotlin {
4848
sourceSets {
4949
commonMain.dependencies {
5050
api(project(":kotlinx-io-bytestring"))
51+
api(libs.kotlinx.coroutines.core)
5152
}
52-
appleTest.dependencies {
53-
implementation(libs.kotlinx.coroutines.core)
53+
commonTest.dependencies {
54+
implementation(libs.kotlinx.coroutines.test)
5455
}
5556
}
5657
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers.
3+
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
4+
*/
5+
package kotlinx.io.coroutines
6+
7+
import kotlinx.io.Buffer
8+
import kotlinx.io.readByteArray
9+
10+
/**
11+
* A streaming decoder that reads a continuous stream of bytes and separates it into discrete
12+
* chunks based on a specified delimiter. The default delimiter is the newline character (`'\n'`).
13+
*
14+
* This class buffers incoming byte arrays and emits individual byte arrays once a delimiter is
15+
* encountered. Any remaining bytes in the buffer are emitted when [onClose] is called.
16+
*
17+
* ## Example
18+
*
19+
* ```kotlin
20+
* val decoder = DelimitingByteStreamDecoder()
21+
* val source: RawSource = // ...
22+
* source.asFlow(decoder).collect { line ->
23+
* println("Received: ${line.decodeToString()}")
24+
* }
25+
* ```
26+
*
27+
* ## Thread Safety
28+
*
29+
* This class is **not thread-safe**. Each instance maintains internal mutable state and must
30+
* not be shared across multiple flows or concurrent coroutines.
31+
*
32+
* ## Lifecycle
33+
*
34+
* After [onClose] is called, this decoder **cannot be reused**. The internal buffer is closed
35+
* and the decoder should be discarded.
36+
*
37+
* @property delimiter The byte value used as a delimiter to separate the stream into chunks.
38+
* Defaults to the newline character (`'\n'`).
39+
*/
40+
public class DelimitingByteStreamDecoder(
41+
public val delimiter: Byte = '\n'.code.toByte(),
42+
) : StreamingDecoder<ByteArray> {
43+
44+
private val buffer = Buffer()
45+
46+
override suspend fun decode(bytes: ByteArray, byteConsumer: suspend (ByteArray) -> Unit) {
47+
var startIndex = 0
48+
for (i in bytes.indices) {
49+
if (bytes[i] == delimiter) {
50+
buffer.write(bytes, startIndex, i)
51+
// flush and clear buffer
52+
byteConsumer.invoke(buffer.readByteArray())
53+
startIndex = i + 1
54+
}
55+
}
56+
// Buffer any remaining bytes after the last delimiter
57+
if (startIndex < bytes.size) {
58+
buffer.write(bytes, startIndex, bytes.size)
59+
}
60+
}
61+
62+
override suspend fun onClose(byteConsumer: suspend (ByteArray) -> Unit) {
63+
if (buffer.size > 0) {
64+
byteConsumer.invoke(buffer.readByteArray())
65+
}
66+
buffer.close()
67+
}
68+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers.
3+
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
4+
*/
5+
package kotlinx.io.coroutines
6+
7+
import kotlinx.coroutines.NonCancellable
8+
import kotlinx.coroutines.flow.Flow
9+
import kotlinx.coroutines.flow.channelFlow
10+
import kotlinx.coroutines.isActive
11+
import kotlinx.coroutines.withContext
12+
import kotlinx.coroutines.yield
13+
import kotlinx.io.Buffer
14+
import kotlinx.io.IOException
15+
import kotlinx.io.RawSource
16+
import kotlinx.io.readByteArray
17+
18+
public const val READ_BUFFER_SIZE: Long = 8196
19+
20+
/**
21+
* Converts this [RawSource] into a Kotlin [Flow], emitting decoded data using the provided [StreamingDecoder].
22+
*
23+
* This function reads data from the source in chunks, decodes it using the provided decoder, and emits
24+
* the decoded elements downstream. The returned flow is cold and will start reading from the source
25+
* when collected.
26+
*
27+
* ## Lifecycle and Resource Management
28+
*
29+
* - The source is automatically closed when the flow completes, fails, or is cancelled
30+
* - The decoder's [StreamingDecoder.onClose] is always called for cleanup, even on cancellation
31+
* - On normal completion or [IOException], any remaining buffered data in the decoder is emitted
32+
* - On cancellation, the decoder is cleaned up but remaining data is discarded
33+
*
34+
* ## Backpressure
35+
*
36+
* The flow respects structured concurrency and backpressure. Reading from the source is suspended
37+
* when the downstream collector cannot keep up.
38+
*
39+
* @param T The type of elements emitted by the Flow after decoding.
40+
* @param decoder The [StreamingDecoder] used to decode data read from this source.
41+
* @param readBufferSize The size of the buffer used for reading from the source. Defaults to [READ_BUFFER_SIZE].
42+
* @return A cold [Flow] that emits decoded elements of type [T].
43+
* @throws IOException if an I/O error occurs while reading from the source.
44+
*/
45+
public fun <T> RawSource.asFlow(
46+
decoder: StreamingDecoder<T>,
47+
readBufferSize: Long = READ_BUFFER_SIZE
48+
): Flow<T> =
49+
channelFlow {
50+
val source = this@asFlow
51+
val buffer = Buffer()
52+
var decoderClosed = false
53+
try {
54+
source.use { source ->
55+
while (isActive) {
56+
val bytesRead = source.readAtMostTo(buffer, readBufferSize)
57+
if (bytesRead == -1L) {
58+
break
59+
}
60+
61+
if (bytesRead > 0L) {
62+
val bytes = buffer.readByteArray()
63+
buffer.clear()
64+
decoder.decode(bytes) {
65+
send(it)
66+
}
67+
}
68+
69+
yield() // Giving other coroutines a chance to run
70+
}
71+
}
72+
// Normal completion: emit any remaining buffered data
73+
decoder.onClose { send(it) }
74+
decoderClosed = true
75+
} catch (exception: IOException) {
76+
// IO error: try to emit remaining data, then close with error
77+
runCatching { decoder.onClose { send(it) } }.onSuccess { decoderClosed = true }
78+
throw exception
79+
} finally {
80+
// Ensure decoder cleanup even on cancellation or other exceptions
81+
if (!decoderClosed) {
82+
withContext(NonCancellable) {
83+
runCatching { decoder.onClose { /* discard data, cleanup only */ } }
84+
}
85+
}
86+
buffer.clear()
87+
}
88+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers.
3+
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
4+
*/
5+
package kotlinx.io.coroutines
6+
7+
/**
8+
* A generic interface for decoding a stream of bytes into discrete elements of type [T].
9+
*
10+
* Implementations of this interface are responsible for processing input byte arrays, decoding
11+
* them into meaningful elements, and delivering them to the provided `byteConsumer` function in
12+
* sequential order. This allows for efficient handling of streaming data and enables
13+
* processing without requiring the entire stream to be loaded into memory.
14+
*
15+
* ## Lifecycle
16+
*
17+
* The decoder processes a stream through repeated calls to [decode], followed by a final call
18+
* to [onClose] when the stream ends. After [onClose] is called, the decoder should not be reused.
19+
*
20+
* ## Thread Safety
21+
*
22+
* Implementations are not required to be thread-safe. Each decoder instance should be used with
23+
* a single stream and should not be shared across concurrent coroutines.
24+
*
25+
* @param T The type of elements produced by the decoder.
26+
*/
27+
public interface StreamingDecoder<T> {
28+
/**
29+
* Decodes a chunk of bytes from the input stream.
30+
*
31+
* This method may be called multiple times as data arrives. Implementations should buffer
32+
* incomplete elements internally and emit complete elements via [byteConsumer].
33+
*
34+
* @param bytes The input byte array to decode.
35+
* @param byteConsumer A suspend function that receives decoded elements.
36+
*/
37+
public suspend fun decode(bytes: ByteArray, byteConsumer: suspend (T) -> Unit)
38+
39+
/**
40+
* Called when the input stream ends, allowing the decoder to emit any remaining buffered data
41+
* and perform cleanup.
42+
*
43+
* After this method is called, the decoder should not be used again.
44+
*
45+
* @param byteConsumer A suspend function that receives any final decoded elements.
46+
*/
47+
public suspend fun onClose(byteConsumer: suspend (T) -> Unit)
48+
}
49+

0 commit comments

Comments
 (0)