Notice
Recent Posts
Recent Comments
Link
study record
[Swift Concurrency] Custom Asynchronous Sequences With AsyncStream 본문
Swift/Concurrency
[Swift Concurrency] Custom Asynchronous Sequences With AsyncStream
asong 2023. 5. 29. 22:39Custom Asynchronous Sequences를 만드는 방법 첫번째
AsyncSequence와 AsyncIteratorProtocol 둘 다를 채택해서 구현
- makeAsyncIterator() 구현
- next async thorws() 구현
두번째 방법 -> AsyncStream
- AsyncStream은 AsyncSequence를 채택한다.
- AsyncSequence로부터 기본 행동들을 다 상속한다.
- 싱글 클로저로부터 값들 produce
- sequence의 끝에 값이나 nil을 리턴하는 클로저를 가지는 방법과, continuation을 활용하는 방법이 있다.
🧪 AsyncStream 안의 비동기 메소드 구현에 Task는 꼭 필요한가?
let stream = AsyncStream<Int> { continuation in
Task.detached {
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
}
// AsyncStream 안의 Task는 꼭 필요한가?
let stream = AsyncStream<Int> { continuation in
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
→ 없으면 컴파일 에러 발생. invalid conversion from throwing function of type '(AsyncStream<Int>.Continuation) async throws -> Void' to non-throwing function type '(AsyncStream<Int>.Continuation) -> Void'
AsyncStream 안의 클로저는 async throws하면 안 된다. Task가 필요.
🧪 AsyncStream 안에 Task 바깥에 코드를 배치 가능할까?
let stream = AsyncStream<Int> { continuation in
print("6")
Task.detached {
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
print("7")
}
Task {
for await num in stream {
print(num)
}
}
// 6 7 1 2 3 4 5
→ 가능하다.
실행 순서는 Task보다 이전에 시작된다.
🧪 AsyncThrowingStream은 어떨까?
enum NumError: Error {
case overThree
}
let stream = AsyncThrowingStream<Int, Error> { continuation in
print("6")
Task.detached {
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
if num > 3 {
continuation.finish(throwing: NumError.overThree)
} else {
continuation.yield(num)
}
}
continuation.finish()
}
print("7")
}
Task {
do {
for try await num in stream {
print(num)
}
} catch {
print(error)
}
}
// 6 7 1 2 3 overThree
→ AsyncThrowingStream도 마찬가지로 클로저 안에 Task를 지울 수 없다. 클로저가 async하면 안 된다.
→ Task 바깥이 똑같이 먼저 실행된다.
🧪 AsyncThrowingStream 안에 두 개의 Task를 두면 어떻게 실행될까?
enum NumError: Error {
case overThree
case overFive
}
let stream = AsyncThrowingStream<Int, Error> { continuation in
print("0")
Task.detached {
for num in 1 ... 5 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
if num > 3 {
continuation.finish(throwing: NumError.overThree)
} else {
continuation.yield(num)
}
}
continuation.finish()
}
print("6")
Task.detached {
for num in 7 ... 10 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
if num > 5 {
continuation.finish(throwing: NumError.overFive)
} else {
continuation.yield(num)
}
}
continuation.finish()
}
}
Task {
do {
for try await num in stream {
print(num)
}
} catch {
print(error)
}
}
// 0 6 1 overFive
여전히 Task 바깥의 것들이 먼저 실행되고,
두 개의 Task가 병렬적으로 실행되며, 에러가 발생하면 두 테스크 모두 멈춘다.
🧪 AsyncStream에도 두 개의 Task를 둬보았을 때!
let stream = AsyncStream<Int> { continuation in
print("0")
Task.detached {
for num in 1 ... 3 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
print("4")
Task.detached {
for num in 5 ... 7 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
}
Task {
for await num in stream {
print(num)
}
}
// 0 4 1 5 2 6 3 7
똑같이 두 테스크가 병렬적으로 실행된다.
🧪 스레드 체크
let stream = AsyncStream<Int> { continuation in
print("stream 시작 : \(Thread.isMainThread)")
Task.detached {
print("stream 내부 : \(Thread.isMainThread)")
for num in 1 ... 3 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
print("stream 끝 : \(Thread.isMainThread)")
}
Task {
for await num in stream {
print(num)
}
}
// stream 시작 : true
// stream 내부 : false
// stream 끝 : true
AsyncStream, Continuation에 메인스레드 관련 내부구현이 있는 것은 아니고, Playground 자체가 기본 메인 스레드인 듯 하다.
🧪 Task에 @MainActor 붙이기
let stream = AsyncStream<Int> { continuation in
print("stream 시작 : \(Thread.isMainThread)")
Task.detached { @MainActor in
print("stream 내부 : \(Thread.isMainThread)")
for num in 1 ... 3 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
print("stream 끝 : \(Thread.isMainThread)")
}
Task {
for await num in stream {
print(num)
}
}
// stream 시작 : true
// stream 끝 : true
// stream 내부 : true
🧪 AsyncStream 호출하는 쪽은 AsyncStream 내부 구현에 스레드 영향을 받는가?
let stream = AsyncStream<Int> { continuation in
print("stream 시작 : \(Thread.isMainThread)")
Task.detached { @MainActor in
print("stream 내부1 : \(Thread.isMainThread)")
for num in 1 ... 3 {
try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
continuation.yield(num)
}
continuation.finish()
}
print("stream 끝 : \(Thread.isMainThread)")
}
Task {
for await num in stream {
print("\(num) : \(Thread.isMainThread)")
}
}
// stream 시작 : true
// stream 끝 : true
// stream 내부1 : true
// 1 : false
// 2 : false
// 3 : false
stream을 호출하는 쪽과 stream 내부의 스레드는 연관이 없다.
'Swift > Concurrency' 카테고리의 다른 글
[Swift Concurrency] AsyncStream, AsyncThrowingStream, Continuation이란? (0) | 2024.01.30 |
---|---|
[Swift Concurrency] Intermediate async/await & CheckedContinuation (0) | 2023.06.11 |
[Swift Concurrency] AsyncSequence & Intermediate Task (0) | 2023.05.24 |
[Swift Concurrency] Getting Started with async/await (0) | 2023.05.21 |