Appearance
并发编程
本指南介绍 Targo 中的并发编程模型,包括 goroutine、channel 和 select 语句。
目标读者
- 熟悉 TypeScript async/await 的开发者
- 想要理解 Go 并发模型的开发者
- 需要编写高性能并发程序的开发者
核心概念
Targo 采用 Go 的并发模型:
- goroutine - 轻量级线程,使用
go()函数启动 - channel - goroutine 之间的通信管道
- select - 多路复用,同时等待多个 channel 操作
与 TypeScript 的区别
TypeScript 使用 Promise 和 async/await 处理异步操作。Targo 使用 goroutine 和 channel 实现真正的并发。
Goroutine
启动 Goroutine
使用 go() 函数启动一个新的 goroutine:
typescript
import { go } from "@targo/runtime";
// 启动一个 goroutine
go(() => {
console.log("Hello from goroutine!");
});
// 主程序继续执行
console.log("Hello from main!");
// 输出顺序不确定:
// Hello from main!
// Hello from goroutine!
// 或
// Hello from goroutine!
// Hello from main!编译为 Go:
go
go func() {
fmt.Println("Hello from goroutine!")
}()
fmt.Println("Hello from main!")带参数的 Goroutine
typescript
function printNumbers(from: int, to: int): void {
for (let i = from; i <= to; i++) {
console.log(i);
time.Sleep(100 * time.Millisecond);
}
}
// 启动多个 goroutine
go(() => printNumbers(1, 5));
go(() => printNumbers(10, 15));
// 等待 goroutine 完成
time.Sleep(1 * time.Second);Goroutine 的特点
| 特性 | 说明 |
|---|---|
| 轻量级 | 每个 goroutine 只占用几 KB 内存 |
| 并发执行 | 多个 goroutine 可以同时运行 |
| 调度器管理 | Go 运行时自动调度 goroutine |
| 无返回值 | goroutine 不能直接返回值,使用 channel 通信 |
Channel
Channel 是 goroutine 之间通信的管道。
创建 Channel
typescript
// 无缓冲 channel(同步)
let ch = chan.make<int>();
// 有缓冲 channel(异步)
let buffered = chan.make<string>(10); // 缓冲大小 10编译为 Go:
go
ch := make(chan int)
buffered := make(chan string, 10)发送和接收
typescript
let ch = chan.make<int>();
// 在 goroutine 中发送
go(() => {
ch.send(42);
});
// 在主程序中接收
let value = ch.receive();
console.log(value); // 42编译为 Go:
go
ch := make(chan int)
go func() {
ch <- 42
}()
value := <-ch
fmt.Println(value)无缓冲 vs 有缓冲 Channel
无缓冲 channel(同步):
typescript
let ch = chan.make<int>();
// 发送会阻塞,直到有接收者
go(() => {
console.log("Sending...");
ch.send(42);
console.log("Sent!");
});
time.Sleep(1 * time.Second);
console.log("Receiving...");
let val = ch.receive();
console.log(`Received: ${val}`);
// 输出:
// Sending...
// Receiving...
// Received: 42
// Sent!有缓冲 channel(异步):
typescript
let ch = chan.make<int>(2); // 缓冲大小 2
// 可以发送 2 个值而不阻塞
ch.send(1);
ch.send(2);
console.log("Sent 2 values");
// 第 3 个发送会阻塞
go(() => {
ch.send(3); // 阻塞,直到有接收者
console.log("Sent 3rd value");
});
time.Sleep(1 * time.Second);
console.log(ch.receive()); // 1
console.log(ch.receive()); // 2
console.log(ch.receive()); // 3关闭 Channel
typescript
let ch = chan.make<int>(3);
// 发送数据
ch.send(1);
ch.send(2);
ch.send(3);
// 关闭 channel
ch.close();
// 从已关闭的 channel 接收
let [val1, ok1] = ch.tryReceive(); // [1, true]
let [val2, ok2] = ch.tryReceive(); // [2, true]
let [val3, ok3] = ch.tryReceive(); // [3, true]
let [val4, ok4] = ch.tryReceive(); // [0, false] - channel 已空且已关闭注意
- 只有发送者应该关闭 channel
- 向已关闭的 channel 发送会 panic
- 从已关闭的 channel 接收会返回零值
迭代 Channel
使用 for...of 循环迭代 channel,直到它被关闭:
typescript
let ch = chan.make<string>(3);
// 在 goroutine 中发送数据
go(() => {
ch.send("hello");
ch.send("world");
ch.send("!");
ch.close(); // 必须关闭,否则循环会永久阻塞
});
// 迭代直到 channel 关闭
for (const msg of ch) {
console.log(msg);
}
// 输出:
// hello
// world
// !Select 语句
Select 允许同时等待多个 channel 操作,类似于 switch 语句。
基本用法
typescript
let ch1 = chan.make<int>();
let ch2 = chan.make<string>();
// 在 goroutine 中发送数据
go(() => {
time.Sleep(1 * time.Second);
ch1.send(42);
});
go(() => {
time.Sleep(2 * time.Second);
ch2.send("hello");
});
// 等待第一个可用的 channel
switch (chan.$select) {
case ch1.$recv:
let num = ch1.$value;
console.log(`Received number: ${num}`);
break;
case ch2.$recv:
let str = ch2.$value;
console.log(`Received string: ${str}`);
break;
}
// 输出:Received number: 42编译为 Go:
go
select {
case num := <-ch1:
fmt.Printf("Received number: %d\n", num)
case str := <-ch2:
fmt.Printf("Received string: %s\n", str)
}带 default 的 Select
使用 default 分支实现非阻塞操作:
typescript
let ch = chan.make<int>();
switch (chan.$select) {
case ch.$recv:
let val = ch.$value;
console.log(`Received: ${val}`);
break;
default:
console.log("No data available");
}
// 输出:No data available发送操作
Select 也可以用于发送操作:
typescript
let ch1 = chan.make<int>(1);
let ch2 = chan.make<int>(1);
switch (chan.$select) {
case ch1.$send(42):
console.log("Sent to ch1");
break;
case ch2.$send(100):
console.log("Sent to ch2");
break;
default:
console.log("No channel ready");
}常见并发模式
1. Worker Pool
使用多个 goroutine 处理任务队列:
typescript
function worker(id: int, jobs: chan<int>, results: chan<int>): void {
for (const job of jobs) {
console.log(`Worker ${id} processing job ${job}`);
time.Sleep(1 * time.Second);
results.send(job * 2);
}
}
// 创建 channel
let jobs = chan.make<int>(100);
let results = chan.make<int>(100);
// 启动 3 个 worker
for (let w = 1; w <= 3; w++) {
go(() => worker(w, jobs, results));
}
// 发送 5 个任务
for (let j = 1; j <= 5; j++) {
jobs.send(j);
}
jobs.close();
// 收集结果
for (let a = 1; a <= 5; a++) {
console.log(`Result: ${results.receive()}`);
}2. 超时控制
使用 select 和 timer 实现超时:
typescript
let ch = chan.make<string>();
go(() => {
time.Sleep(2 * time.Second);
ch.send("result");
});
let timeout = time.After(1 * time.Second);
switch (chan.$select) {
case ch.$recv:
let result = ch.$value;
console.log(`Got result: ${result}`);
break;
case timeout.$recv:
console.log("Timeout!");
break;
}
// 输出:Timeout!3. 扇出/扇入(Fan-out/Fan-in)
多个 goroutine 从同一个 channel 读取(扇出),结果汇总到一个 channel(扇入):
typescript
function producer(ch: chan<int>): void {
for (let i = 0; i < 10; i++) {
ch.send(i);
}
ch.close();
}
function square(input: chan<int>, output: chan<int>): void {
for (const n of input) {
output.send(n * n);
}
}
// 扇出:多个 worker 处理
let input = chan.make<int>(10);
let output = chan.make<int>(10);
go(() => producer(input));
// 启动 3 个 worker
for (let i = 0; i < 3; i++) {
go(() => square(input, output));
}
// 扇入:收集结果
// 注意:需要知道总共有多少个结果
for (let i = 0; i < 10; i++) {
console.log(output.receive());
}4. 管道(Pipeline)
将多个处理阶段连接起来:
typescript
function generate(nums: int[]): chan<int> {
let out = chan.make<int>();
go(() => {
for (const n of nums) {
out.send(n);
}
out.close();
});
return out;
}
function square(input: chan<int>): chan<int> {
let out = chan.make<int>();
go(() => {
for (const n of input) {
out.send(n * n);
}
out.close();
});
return out;
}
// 构建管道
let nums = [1, 2, 3, 4, 5];
let ch1 = generate(nums);
let ch2 = square(ch1);
// 消费结果
for (const result of ch2) {
console.log(result);
}
// 输出:1, 4, 9, 16, 25TypeScript 对比
异步操作
| 特性 | TypeScript | Targo |
|---|---|---|
| 并发原语 | Promise | goroutine + channel |
| 启动异步 | async function | go(() => ...) |
| 等待结果 | await promise | ch.receive() |
| 多路复用 | Promise.race() | select 语句 |
| 错误处理 | try/catch | 错误返回值 |
代码对比
TypeScript:
typescript
async function fetchData(url: string): Promise<string> {
const response = await fetch(url);
return await response.text();
}
async function main() {
const promise1 = fetchData("https://api1.com");
const promise2 = fetchData("https://api2.com");
const result = await Promise.race([promise1, promise2]);
console.log(result);
}Targo:
typescript
function fetchData(url: string, result: chan<string>): void {
// 模拟 HTTP 请求
let data = http.Get(url);
result.send(data);
}
function main(): void {
let ch1 = chan.make<string>();
let ch2 = chan.make<string>();
go(() => fetchData("https://api1.com", ch1));
go(() => fetchData("https://api2.com", ch2));
switch (chan.$select) {
case ch1.$recv:
console.log(ch1.$value);
break;
case ch2.$recv:
console.log(ch2.$value);
break;
}
}实际示例
并发 HTTP 请求
typescript
import { http } from "net/http";
import { io } from "io";
interface Result {
url: string;
status: int;
body: string;
}
function fetch(url: string, results: chan<Result>): void {
let [resp, err] = http.Get(url);
if (err != null) {
results.send({ url, status: 0, body: err.Error() });
return;
}
let [body, _] = io.ReadAll(resp.Body);
resp.Body.Close();
results.send({
url,
status: resp.StatusCode,
body: string(body)
});
}
// 并发请求多个 URL
let urls = [
"https://api.github.com",
"https://api.twitter.com",
"https://api.reddit.com"
];
let results = chan.make<Result>(urls.length);
for (const url of urls) {
go(() => fetch(url, results));
}
// 收集结果
for (let i = 0; i < urls.length; i++) {
let result = results.receive();
console.log(`${result.url}: ${result.status}`);
}限流器(Rate Limiter)
typescript
function rateLimiter(rate: int): chan<time.Time> {
let ticker = time.NewTicker(time.Second / rate);
return ticker.C;
}
// 每秒最多 5 个请求
let limiter = rateLimiter(5);
for (let i = 0; i < 20; i++) {
limiter.receive(); // 等待限流器
go(() => {
console.log(`Request ${i} at ${time.Now()}`);
// 执行请求...
});
}常见陷阱
1. Goroutine 泄漏
typescript
// ❌ 错误:goroutine 永远不会结束
function leak(): void {
let ch = chan.make<int>();
go(() => {
ch.receive(); // 永远阻塞
});
// ch 没有被发送数据
}
// ✅ 正确:使用 context 或 timeout
function noLeak(): void {
let ch = chan.make<int>();
let done = chan.make<bool>();
go(() => {
switch (chan.$select) {
case ch.$recv:
console.log(ch.$value);
break;
case done.$recv:
return;
}
});
// 清理
done.close();
}2. 忘记关闭 Channel
typescript
// ❌ 错误:循环永远不会结束
let ch = chan.make<int>(3);
ch.send(1);
ch.send(2);
ch.send(3);
// 忘记 ch.close()
for (const val of ch) { // 永远阻塞
console.log(val);
}
// ✅ 正确:关闭 channel
ch.close();
for (const val of ch) {
console.log(val);
}3. 向已关闭的 Channel 发送
typescript
let ch = chan.make<int>();
ch.close();
// ❌ 错误:panic!
ch.send(42);最佳实践
使用 channel 传递数据所有权
- 避免共享内存,通过 channel 通信
发送者关闭 channel
- 只有发送者应该关闭 channel
- 接收者不应该关闭 channel
使用 buffered channel 避免阻塞
- 根据场景选择合适的缓冲大小
- 无缓冲 channel 用于同步
使用 select 处理超时
- 避免 goroutine 永久阻塞
- 使用
time.After()实现超时
避免 goroutine 泄漏
- 确保所有 goroutine 都能正常退出
- 使用 context 或 done channel