study record

[Swift Concurrency] Custom Asynchronous Sequences With AsyncStream 본문

Swift/Concurrency

[Swift Concurrency] Custom Asynchronous Sequences With AsyncStream

asong 2023. 5. 29. 22:39

Custom Asynchronous Sequences를 만드는 방법 첫번째

AsyncSequenceAsyncIteratorProtocol 둘 다를 채택해서 구현

- makeAsyncIterator() 구현

- next async thorws() 구현

 

두번째 방법 -> AsyncStream

- AsyncStream은 AsyncSequence를 채택한다.

    - AsyncSequence로부터 기본 행동들을 다 상속한다.

- 싱글 클로저로부터 값들 produce

- sequence의 끝에 값이나 nil을 리턴하는 클로저를 가지는 방법과, continuation을 활용하는 방법이 있다.

 
 

🧪 AsyncStream 안의 비동기 메소드 구현에 Task는 꼭 필요한가?

let stream = AsyncStream<Int> { continuation in
    Task.detached {
        for num in 1 ... 5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
   }
}

// AsyncStream 안의 Task는 꼭 필요한가?

let stream = AsyncStream<Int> { continuation in
    for num in 1 ... 5 {
        try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
        continuation.yield(num)
    }
	  continuation.finish()
}​
→ 없으면 컴파일 에러 발생. invalid conversion from throwing function of type '(AsyncStream<Int>.Continuation) async throws -> Void' to non-throwing function type '(AsyncStream<Int>.Continuation) -> Void'
AsyncStream 안의 클로저는 async throws하면 안 된다. Task가 필요.
 

 

🧪 AsyncStream 안에 Task 바깥에 코드를 배치 가능할까?

let stream = AsyncStream<Int> { continuation in
    print("6")
    Task.detached {
        for num in 1 ... 5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
  }
    print("7")
}

Task {
    for await num in stream {
        print(num)
    }
}
// 6 7 1 2 3 4 5​
→ 가능하다.
실행 순서는 Task보다 이전에 시작된다.
 

🧪 AsyncThrowingStream은 어떨까?

enum NumError: Error {
    case overThree
}

let stream = AsyncThrowingStream<Int, Error> { continuation in
		print("6")
    Task.detached {
        for num in 1 ... 5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            if num > 3 {
                continuation.finish(throwing: NumError.overThree)
            } else {
                continuation.yield(num)
            }
        }
        continuation.finish()
    }
		print("7")
}

Task {
    do {
        for try await num in stream {
            print(num)
        }
    } catch {
      print(error)
    }
}

// 6 7 1 2 3 overThree

→ AsyncThrowingStream도 마찬가지로 클로저 안에 Task를 지울 수 없다. 클로저가 async하면 안 된다.

→ Task 바깥이 똑같이 먼저 실행된다.

 

🧪 AsyncThrowingStream 안에 두 개의 Task를 두면 어떻게 실행될까?

enum NumError: Error {
    case overThree
    case overFive
}

let stream = AsyncThrowingStream<Int, Error> { continuation in
    print("0")
    Task.detached {
        for num in 1 ... 5 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            if num > 3 {
                continuation.finish(throwing: NumError.overThree)
            } else {
                continuation.yield(num)
            }
        }
        continuation.finish()
    }
    print("6")
    Task.detached {
        for num in 7 ... 10 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            if num > 5 {
                continuation.finish(throwing: NumError.overFive)
            } else {
                continuation.yield(num)
            }
        }
        continuation.finish()
    }
}

Task {
    do {
        for try await num in stream {
            print(num)
        }
    } catch {
      print(error)
    }
}
// 0 6 1 overFive
여전히 Task 바깥의 것들이 먼저 실행되고,
두 개의 Task가 병렬적으로 실행되며, 에러가 발생하면 두 테스크 모두 멈춘다.
 

🧪 AsyncStream에도 두 개의 Task를 둬보았을 때!

let stream = AsyncStream<Int> { continuation in
    print("0")
    Task.detached {
        for num in 1 ... 3 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
    }
    print("4")
    Task.detached {
        for num in 5 ... 7 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
    }
}

Task {
    for await num in stream {
        print(num)
    }
}

// 0 4 1 5 2 6 3 7​
똑같이 두 테스크가 병렬적으로 실행된다.

 

🧪 스레드 체크

let stream = AsyncStream<Int> { continuation in
    print("stream 시작 : \(Thread.isMainThread)")
    Task.detached {
        print("stream 내부 : \(Thread.isMainThread)")
        for num in 1 ... 3 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
    }
    print("stream 끝 : \(Thread.isMainThread)")
}

Task {
    for await num in stream {
        print(num)
    }
}

// stream 시작 : true
// stream 내부 : false
// stream 끝 : true​
AsyncStream, Continuation에 메인스레드 관련 내부구현이 있는 것은 아니고, Playground 자체가 기본 메인 스레드인 듯 하다.
 

🧪 Task에 @MainActor 붙이기

let stream = AsyncStream<Int> { continuation in
    print("stream 시작 : \(Thread.isMainThread)")
    Task.detached { @MainActor in
        print("stream 내부 : \(Thread.isMainThread)")
        for num in 1 ... 3 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
    }
    print("stream 끝 : \(Thread.isMainThread)")
}

Task {
    for await num in stream {
        print(num)
    }
}

// stream 시작 : true
// stream 끝 : true
// stream 내부 : true​
 

🧪 AsyncStream 호출하는 쪽은 AsyncStream 내부 구현에 스레드 영향을 받는가?

let stream = AsyncStream<Int> { continuation in
    print("stream 시작 : \(Thread.isMainThread)")
    Task.detached { @MainActor in
        print("stream 내부1 : \(Thread.isMainThread)")
        for num in 1 ... 3 {
            try await Task.sleep(nanoseconds: 1 * 1_000_000_000)
            continuation.yield(num)
        }
        continuation.finish()
    }
    print("stream 끝 : \(Thread.isMainThread)")
}

Task {
    for await num in stream {
        print("\(num) : \(Thread.isMainThread)")
    }
}

// stream 시작 : true
// stream 끝 : true
// stream 내부1 : true
// 1 : false
// 2 : false
// 3 : false
stream을 호출하는 쪽과 stream 내부의 스레드는 연관이 없다.