presto源码阅读1:从cli提交任务说起

presto的server和client是通过Restful接口进行交互的,可以通过各种语言的客户端提交查询,也可以通过命令行cli提交查询。LZ这里就用最简单的cli来进行分析。

1. 入口类Presto

Console console = singleCommand(Console.class).parse(args);
if (console.helpOption.showHelpIfRequested() ||
        console.versionOption.showVersionIfRequested()) {
    return;
}
System.exit(console.run() ? 0 : 1);

如果参数中带有–help或者–version就直接返回,否则就进行查询。

2. Console的run方法

public boolean run() {
    ...
    boolean hasQuery = !isNullOrEmpty(clientOptions.execute);
    boolean isFromFile = !isNullOrEmpty(clientOptions.file);
    ...
    if (hasQuery) {
        return executeCommand(queryRunner, query, clientOptions.outputFormat, clientOptions.ignoreErrors);
    }
    ...
}

参数–execute表示执行其后的sql;参数–file表示执行其后文件里的sql。如果这两个参数都不存在,则进入cli交互界面。LZ这里使用–execute为例。

3. Console的executeCommand方法

private static boolean executeCommand(QueryRunner queryRunner, String query, OutputFormat outputFormat, boolean ignoreErrors) {
    boolean success = true;
    StatementSplitter splitter = new StatementSplitter(query);
    for (Statement split : splitter.getCompleteStatements()) {
        if (!isEmptyStatement(split.statement())) {
            if (!process(queryRunner, split.statement(), outputFormat, () -> {}, false)) {
                if (!ignoreErrors) {
                    return false;
                }
                success = false;
            }
        }
    }
    ...
}

参数–execute后的sql语句可以传多个,默认使用“;”来分隔。StatementSplitter把多个sql语句拆分开来一个一个执行。

4. Console的process方法

private static boolean process(QueryRunner queryRunner, String sql, OutputFormat outputFormat, Runnable schemaChanged, boolean interactive) {
    ...
    try (Query query = queryRunner.startQuery(finalSql)) {
        boolean success = query.renderOutput(System.out, outputFormat, interactive);
        ...
    }
    ...
}

对于一次查询,是分为两步执行的:初始查询和后续循环查询,分别通过startQuery和renderOutput方法来完成的。初始查询的时候,不返回数据,只返回nextUri。后续查询都是使用上一个查询返回的nextUri来查询下一批数据的。

5. QueryRunner的startQuery方法 -> StatementClientV1的构造方法

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query) {
    ...
    Request request = buildQueryRequest(session, query);
    JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
    ...
    processResponse(response.getHeaders(), response.getValue());
}

sql是通过httpClient发送到server端的。上面那个response里面没有数据,只有nextUri。

6. StatementClientV1的buildQueryRequest方法

private Request buildQueryRequest(ClientSession session, String query) {
    HttpUrl url = HttpUrl.get(session.getServer());
    ...
    url = url.newBuilder().encodedPath("/v1/statement").build();
    ...
}

getServer获取到的是参数–server后的值,/v1/statement是执行查询的路径,该路径只是在第一次提交的时候才会调用,后面都是调用nextUri。

7. Query的renderOutput方法

public boolean renderOutput(PrintStream out, OutputFormat outputFormat, boolean interactive) {
    Thread clientThread = Thread.currentThread();
    // 处理kill -2
    SignalHandler oldHandler = Signal.handle(SIGINT, signal -> {
        if (ignoreUserInterrupt.get() || client.isClientAborted()) {
            return;
        }
        client.close();
        clientThread.interrupt();
    });
    try {
        return renderQueryOutput(out, outputFormat, interactive);
    }
    ...
}

对信号量进行处理,可以推断出renderQueryOutput方法是个阻塞方法。

8. Query的renderQueryOutput方法

private boolean renderQueryOutput(PrintStream out, OutputFormat outputFormat, boolean interactive) {
    ...
    if (interactive) {
        statusPrinter = new StatusPrinter(client, out, debug);
        statusPrinter.printInitialStatusUpdates();
    }
    else {
        processInitialStatusUpdates(warningsPrinter);
    }
    ...
}

interactive=true的时候,数据就在屏幕上持续输出,否则就等待结果。

9. StatusPrinter的printInitialStatusUpdates方法

public void printInitialStatusUpdates() {
    long lastPrint = System.nanoTime();
    try {
        WarningsPrinter warningsPrinter = new ConsoleWarningsPrinter(console);
        // #1
        while (client.isRunning()) {
            try {
                // client还有未处理完毕的数据,直接退出循环
                if (client.currentData().getData() != null) {
                    return;
                }
                ...
            }
            ...
            client.advance();
        }
        ...
    }
    ...
}

当client状态为Running的时候,client使用advance方法不断的向server请求数据。

10. StatementClientV1的advance方法
StatementClientV1是StatementClient接口的实现类

public boolean advance() {
    ...
    URI nextUri = currentStatusInfo().getNextUri();
    if (nextUri == null) {
        // 对应9的#1,设置client状态为FINISHED
        state.compareAndSet(State.RUNNING, State.FINISHED);
        return false;
    }
    Request request = prepareRequest(HttpUrl.get(nextUri)).build();
    ...
}

currentStatusInfo方法获取的是当前的请求结果,为了保证线程安全,结果保存在AtomicReference中。nextUri是由Coordinator生成,内容是/v1/statement/queued/{queryId}/{token}。通过以上的方式,实现了动态打印数据。

11. 流程图
presto_cli
图片来自《Presto技术内幕》

发表评论