Skip to content

并发编程

本指南介绍 Targo 中的并发编程模型,包括 goroutine、channel 和 select 语句。

目标读者

  • 熟悉 TypeScript async/await 的开发者
  • 想要理解 Go 并发模型的开发者
  • 需要编写高性能并发程序的开发者

核心概念

Targo 采用 Go 的并发模型:

  • goroutine - 轻量级线程,使用 go() 函数启动
  • channel - goroutine 之间的通信管道
  • select - 多路复用,同时等待多个 channel 操作

与 TypeScript 的区别

TypeScript 使用 Promiseasync/await 处理异步操作。Targo 使用 goroutine 和 channel 实现真正的并发。

Goroutine

启动 Goroutine

使用 go() 函数启动一个新的 goroutine:

typescript
import { go } from "@targo/runtime";

// 启动一个 goroutine
go(() => {
    console.log("Hello from goroutine!");
});

// 主程序继续执行
console.log("Hello from main!");

// 输出顺序不确定:
// Hello from main!
// Hello from goroutine!
// 或
// Hello from goroutine!
// Hello from main!

编译为 Go:

go
go func() {
    fmt.Println("Hello from goroutine!")
}()

fmt.Println("Hello from main!")

带参数的 Goroutine

typescript
function printNumbers(from: int, to: int): void {
    for (let i = from; i <= to; i++) {
        console.log(i);
        time.Sleep(100 * time.Millisecond);
    }
}

// 启动多个 goroutine
go(() => printNumbers(1, 5));
go(() => printNumbers(10, 15));

// 等待 goroutine 完成
time.Sleep(1 * time.Second);

Goroutine 的特点

特性说明
轻量级每个 goroutine 只占用几 KB 内存
并发执行多个 goroutine 可以同时运行
调度器管理Go 运行时自动调度 goroutine
无返回值goroutine 不能直接返回值,使用 channel 通信

Channel

Channel 是 goroutine 之间通信的管道。

创建 Channel

typescript
// 无缓冲 channel(同步)
let ch = chan.make<int>();

// 有缓冲 channel(异步)
let buffered = chan.make<string>(10);  // 缓冲大小 10

编译为 Go:

go
ch := make(chan int)
buffered := make(chan string, 10)

发送和接收

typescript
let ch = chan.make<int>();

// 在 goroutine 中发送
go(() => {
    ch.send(42);
});

// 在主程序中接收
let value = ch.receive();
console.log(value);  // 42

编译为 Go:

go
ch := make(chan int)

go func() {
    ch <- 42
}()

value := <-ch
fmt.Println(value)

无缓冲 vs 有缓冲 Channel

无缓冲 channel(同步):

typescript
let ch = chan.make<int>();

// 发送会阻塞,直到有接收者
go(() => {
    console.log("Sending...");
    ch.send(42);
    console.log("Sent!");
});

time.Sleep(1 * time.Second);
console.log("Receiving...");
let val = ch.receive();
console.log(`Received: ${val}`);

// 输出:
// Sending...
// Receiving...
// Received: 42
// Sent!

有缓冲 channel(异步):

typescript
let ch = chan.make<int>(2);  // 缓冲大小 2

// 可以发送 2 个值而不阻塞
ch.send(1);
ch.send(2);
console.log("Sent 2 values");

// 第 3 个发送会阻塞
go(() => {
    ch.send(3);  // 阻塞,直到有接收者
    console.log("Sent 3rd value");
});

time.Sleep(1 * time.Second);
console.log(ch.receive());  // 1
console.log(ch.receive());  // 2
console.log(ch.receive());  // 3

关闭 Channel

typescript
let ch = chan.make<int>(3);

// 发送数据
ch.send(1);
ch.send(2);
ch.send(3);

// 关闭 channel
ch.close();

// 从已关闭的 channel 接收
let [val1, ok1] = ch.tryReceive();  // [1, true]
let [val2, ok2] = ch.tryReceive();  // [2, true]
let [val3, ok3] = ch.tryReceive();  // [3, true]
let [val4, ok4] = ch.tryReceive();  // [0, false] - channel 已空且已关闭

注意

  • 只有发送者应该关闭 channel
  • 向已关闭的 channel 发送会 panic
  • 从已关闭的 channel 接收会返回零值

迭代 Channel

使用 for...of 循环迭代 channel,直到它被关闭:

typescript
let ch = chan.make<string>(3);

// 在 goroutine 中发送数据
go(() => {
    ch.send("hello");
    ch.send("world");
    ch.send("!");
    ch.close();  // 必须关闭,否则循环会永久阻塞
});

// 迭代直到 channel 关闭
for (const msg of ch) {
    console.log(msg);
}

// 输出:
// hello
// world
// !

Select 语句

Select 允许同时等待多个 channel 操作,类似于 switch 语句。

基本用法

typescript
let ch1 = chan.make<int>();
let ch2 = chan.make<string>();

// 在 goroutine 中发送数据
go(() => {
    time.Sleep(1 * time.Second);
    ch1.send(42);
});

go(() => {
    time.Sleep(2 * time.Second);
    ch2.send("hello");
});

// 等待第一个可用的 channel
switch (chan.$select) {
    case ch1.$recv:
        let num = ch1.$value;
        console.log(`Received number: ${num}`);
        break;
    
    case ch2.$recv:
        let str = ch2.$value;
        console.log(`Received string: ${str}`);
        break;
}

// 输出:Received number: 42

编译为 Go:

go
select {
case num := <-ch1:
    fmt.Printf("Received number: %d\n", num)
case str := <-ch2:
    fmt.Printf("Received string: %s\n", str)
}

带 default 的 Select

使用 default 分支实现非阻塞操作:

typescript
let ch = chan.make<int>();

switch (chan.$select) {
    case ch.$recv:
        let val = ch.$value;
        console.log(`Received: ${val}`);
        break;
    
    default:
        console.log("No data available");
}

// 输出:No data available

发送操作

Select 也可以用于发送操作:

typescript
let ch1 = chan.make<int>(1);
let ch2 = chan.make<int>(1);

switch (chan.$select) {
    case ch1.$send(42):
        console.log("Sent to ch1");
        break;
    
    case ch2.$send(100):
        console.log("Sent to ch2");
        break;
    
    default:
        console.log("No channel ready");
}

常见并发模式

1. Worker Pool

使用多个 goroutine 处理任务队列:

typescript
function worker(id: int, jobs: chan<int>, results: chan<int>): void {
    for (const job of jobs) {
        console.log(`Worker ${id} processing job ${job}`);
        time.Sleep(1 * time.Second);
        results.send(job * 2);
    }
}

// 创建 channel
let jobs = chan.make<int>(100);
let results = chan.make<int>(100);

// 启动 3 个 worker
for (let w = 1; w <= 3; w++) {
    go(() => worker(w, jobs, results));
}

// 发送 5 个任务
for (let j = 1; j <= 5; j++) {
    jobs.send(j);
}
jobs.close();

// 收集结果
for (let a = 1; a <= 5; a++) {
    console.log(`Result: ${results.receive()}`);
}

2. 超时控制

使用 select 和 timer 实现超时:

typescript
let ch = chan.make<string>();

go(() => {
    time.Sleep(2 * time.Second);
    ch.send("result");
});

let timeout = time.After(1 * time.Second);

switch (chan.$select) {
    case ch.$recv:
        let result = ch.$value;
        console.log(`Got result: ${result}`);
        break;
    
    case timeout.$recv:
        console.log("Timeout!");
        break;
}

// 输出:Timeout!

3. 扇出/扇入(Fan-out/Fan-in)

多个 goroutine 从同一个 channel 读取(扇出),结果汇总到一个 channel(扇入):

typescript
function producer(ch: chan<int>): void {
    for (let i = 0; i < 10; i++) {
        ch.send(i);
    }
    ch.close();
}

function square(input: chan<int>, output: chan<int>): void {
    for (const n of input) {
        output.send(n * n);
    }
}

// 扇出:多个 worker 处理
let input = chan.make<int>(10);
let output = chan.make<int>(10);

go(() => producer(input));

// 启动 3 个 worker
for (let i = 0; i < 3; i++) {
    go(() => square(input, output));
}

// 扇入:收集结果
// 注意:需要知道总共有多少个结果
for (let i = 0; i < 10; i++) {
    console.log(output.receive());
}

4. 管道(Pipeline)

将多个处理阶段连接起来:

typescript
function generate(nums: int[]): chan<int> {
    let out = chan.make<int>();
    go(() => {
        for (const n of nums) {
            out.send(n);
        }
        out.close();
    });
    return out;
}

function square(input: chan<int>): chan<int> {
    let out = chan.make<int>();
    go(() => {
        for (const n of input) {
            out.send(n * n);
        }
        out.close();
    });
    return out;
}

// 构建管道
let nums = [1, 2, 3, 4, 5];
let ch1 = generate(nums);
let ch2 = square(ch1);

// 消费结果
for (const result of ch2) {
    console.log(result);
}

// 输出:1, 4, 9, 16, 25

TypeScript 对比

异步操作

特性TypeScriptTargo
并发原语Promisegoroutine + channel
启动异步async functiongo(() => ...)
等待结果await promisech.receive()
多路复用Promise.race()select 语句
错误处理try/catch错误返回值

代码对比

TypeScript

typescript
async function fetchData(url: string): Promise<string> {
    const response = await fetch(url);
    return await response.text();
}

async function main() {
    const promise1 = fetchData("https://api1.com");
    const promise2 = fetchData("https://api2.com");
    
    const result = await Promise.race([promise1, promise2]);
    console.log(result);
}

Targo

typescript
function fetchData(url: string, result: chan<string>): void {
    // 模拟 HTTP 请求
    let data = http.Get(url);
    result.send(data);
}

function main(): void {
    let ch1 = chan.make<string>();
    let ch2 = chan.make<string>();
    
    go(() => fetchData("https://api1.com", ch1));
    go(() => fetchData("https://api2.com", ch2));
    
    switch (chan.$select) {
        case ch1.$recv:
            console.log(ch1.$value);
            break;
        case ch2.$recv:
            console.log(ch2.$value);
            break;
    }
}

实际示例

并发 HTTP 请求

typescript
import { http } from "net/http";
import { io } from "io";

interface Result {
    url: string;
    status: int;
    body: string;
}

function fetch(url: string, results: chan<Result>): void {
    let [resp, err] = http.Get(url);
    if (err != null) {
        results.send({ url, status: 0, body: err.Error() });
        return;
    }
    
    let [body, _] = io.ReadAll(resp.Body);
    resp.Body.Close();
    
    results.send({
        url,
        status: resp.StatusCode,
        body: string(body)
    });
}

// 并发请求多个 URL
let urls = [
    "https://api.github.com",
    "https://api.twitter.com",
    "https://api.reddit.com"
];

let results = chan.make<Result>(urls.length);

for (const url of urls) {
    go(() => fetch(url, results));
}

// 收集结果
for (let i = 0; i < urls.length; i++) {
    let result = results.receive();
    console.log(`${result.url}: ${result.status}`);
}

限流器(Rate Limiter)

typescript
function rateLimiter(rate: int): chan<time.Time> {
    let ticker = time.NewTicker(time.Second / rate);
    return ticker.C;
}

// 每秒最多 5 个请求
let limiter = rateLimiter(5);

for (let i = 0; i < 20; i++) {
    limiter.receive();  // 等待限流器
    go(() => {
        console.log(`Request ${i} at ${time.Now()}`);
        // 执行请求...
    });
}

常见陷阱

1. Goroutine 泄漏

typescript
// ❌ 错误:goroutine 永远不会结束
function leak(): void {
    let ch = chan.make<int>();
    go(() => {
        ch.receive();  // 永远阻塞
    });
    // ch 没有被发送数据
}

// ✅ 正确:使用 context 或 timeout
function noLeak(): void {
    let ch = chan.make<int>();
    let done = chan.make<bool>();
    
    go(() => {
        switch (chan.$select) {
            case ch.$recv:
                console.log(ch.$value);
                break;
            case done.$recv:
                return;
        }
    });
    
    // 清理
    done.close();
}

2. 忘记关闭 Channel

typescript
// ❌ 错误:循环永远不会结束
let ch = chan.make<int>(3);
ch.send(1);
ch.send(2);
ch.send(3);
// 忘记 ch.close()

for (const val of ch) {  // 永远阻塞
    console.log(val);
}

// ✅ 正确:关闭 channel
ch.close();
for (const val of ch) {
    console.log(val);
}

3. 向已关闭的 Channel 发送

typescript
let ch = chan.make<int>();
ch.close();

// ❌ 错误:panic!
ch.send(42);

最佳实践

  1. 使用 channel 传递数据所有权

    • 避免共享内存,通过 channel 通信
  2. 发送者关闭 channel

    • 只有发送者应该关闭 channel
    • 接收者不应该关闭 channel
  3. 使用 buffered channel 避免阻塞

    • 根据场景选择合适的缓冲大小
    • 无缓冲 channel 用于同步
  4. 使用 select 处理超时

    • 避免 goroutine 永久阻塞
    • 使用 time.After() 实现超时
  5. 避免 goroutine 泄漏

    • 确保所有 goroutine 都能正常退出
    • 使用 context 或 done channel

下一步

参考