Skip to content

Commit f09e06c

Browse files
committed
Refactor SourceFlow
1 parent bddb297 commit f09e06c

File tree

6 files changed

+37
-37
lines changed

6 files changed

+37
-37
lines changed

coroutines/src/commonMain/kotlin/coroutines/DelimitingByteStreamDecoder.kt renamed to coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/DelimitingByteStreamDecoder.kt

File renamed without changes.

coroutines/src/commonMain/kotlin/coroutines/SourceFlow.kt renamed to coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/SourceFlow.kt

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
*/
55
package kotlinx.io.coroutines
66

7+
import kotlinx.coroutines.CancellationException
78
import kotlinx.coroutines.NonCancellable
89
import kotlinx.coroutines.flow.Flow
9-
import kotlinx.coroutines.flow.channelFlow
10-
import kotlinx.coroutines.isActive
10+
import kotlinx.coroutines.flow.flow
1111
import kotlinx.coroutines.withContext
1212
import kotlinx.coroutines.yield
1313
import kotlinx.io.Buffer
@@ -45,44 +45,44 @@ public const val READ_BUFFER_SIZE: Long = 8196
4545
public fun <T> RawSource.asFlow(
4646
decoder: StreamingDecoder<T>,
4747
readBufferSize: Long = READ_BUFFER_SIZE
48-
): Flow<T> =
49-
channelFlow {
50-
val source = this@asFlow
51-
val buffer = Buffer()
52-
var decoderClosed = false
53-
try {
54-
source.use { source ->
55-
while (isActive) {
56-
val bytesRead = source.readAtMostTo(buffer, readBufferSize)
57-
if (bytesRead == -1L) {
58-
break
59-
}
48+
): Flow<T> = flow {
49+
val source = this@asFlow
50+
val buffer = Buffer()
51+
var decoderClosed = false
52+
try {
53+
source.use { source ->
54+
while (true) {
55+
val bytesRead = source.readAtMostTo(buffer, readBufferSize)
56+
if (bytesRead == -1L) {
57+
break
58+
}
6059

61-
if (bytesRead > 0L) {
62-
val bytes = buffer.readByteArray()
63-
buffer.clear()
64-
decoder.decode(bytes) {
65-
send(it)
66-
}
60+
if (bytesRead > 0L) {
61+
val bytes = buffer.readByteArray()
62+
decoder.decode(bytes) {
63+
emit(it)
6764
}
68-
69-
yield() // Giving other coroutines a chance to run
7065
}
66+
67+
yield() // Giving other coroutines a chance to run
7168
}
72-
// Normal completion: emit any remaining buffered data
73-
decoder.onClose { send(it) }
74-
decoderClosed = true
75-
} catch (exception: IOException) {
76-
// IO error: try to emit remaining data, then close with error
77-
runCatching { decoder.onClose { send(it) } }.onSuccess { decoderClosed = true }
78-
throw exception
79-
} finally {
80-
// Ensure decoder cleanup even on cancellation or other exceptions
81-
if (!decoderClosed) {
82-
withContext(NonCancellable) {
83-
runCatching { decoder.onClose { /* discard data, cleanup only */ } }
84-
}
69+
}
70+
// Normal completion: emit any remaining buffered data
71+
decoder.onClose { emit(it) }
72+
decoderClosed = true
73+
} catch (e: CancellationException) {
74+
throw e
75+
} catch (exception: IOException) {
76+
// IO error: try to emit remaining data, then close with error
77+
runCatching { decoder.onClose { emit(it) } }.onSuccess { decoderClosed = true }
78+
throw exception
79+
} finally {
80+
// Ensure decoder cleanup even on cancellation or other exceptions
81+
if (!decoderClosed) {
82+
withContext(NonCancellable) {
83+
runCatching { decoder.onClose { /* discard data, cleanup only */ } }
8584
}
86-
buffer.clear()
8785
}
88-
}
86+
buffer.clear()
87+
}
88+
}

coroutines/src/commonMain/kotlin/coroutines/StreamingDecoder.kt renamed to coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/StreamingDecoder.kt

File renamed without changes.

coroutines/src/commonTest/kotlin/coroutines/DelimitingByteStreamDecoderTest.kt renamed to coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/DelimitingByteStreamDecoderTest.kt

File renamed without changes.

coroutines/src/commonTest/kotlin/coroutines/KeyValueStreamIntegrationTest.kt renamed to coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/KeyValueStreamIntegrationTest.kt

File renamed without changes.

coroutines/src/commonTest/kotlin/coroutines/RawSourceAsFlowTest.kt renamed to coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/RawSourceAsFlowTest.kt

File renamed without changes.

0 commit comments

Comments
 (0)