presto的server和client是通过Restful接口进行交互的,可以通过各种语言的客户端提交查询,也可以通过命令行cli提交查询。LZ这里就用最简单的cli来进行分析。
1. 入口类Presto
1 2 3 4 5 6 7 |
Console console = singleCommand(Console.class).parse(args); if (console.helpOption.showHelpIfRequested() || console.versionOption.showVersionIfRequested()) { return; } System.exit(console.run() ? 0 : 1); |
如果参数中带有–help或者–version就直接返回,否则就进行查询。
2. Console的run方法
1 2 3 4 5 6 7 8 9 10 11 |
public boolean run() { ... boolean hasQuery = !isNullOrEmpty(clientOptions.execute); boolean isFromFile = !isNullOrEmpty(clientOptions.file); ... if (hasQuery) { return executeCommand(queryRunner, query, clientOptions.outputFormat, clientOptions.ignoreErrors); } ... } |
参数–execute表示执行其后的sql;参数–file表示执行其后文件里的sql。如果这两个参数都不存在,则进入cli交互界面。LZ这里使用–execute为例。
3. Console的executeCommand方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
private static boolean executeCommand(QueryRunner queryRunner, String query, OutputFormat outputFormat, boolean ignoreErrors) { boolean success = true; StatementSplitter splitter = new StatementSplitter(query); for (Statement split : splitter.getCompleteStatements()) { if (!isEmptyStatement(split.statement())) { if (!process(queryRunner, split.statement(), outputFormat, () -> {}, false)) { if (!ignoreErrors) { return false; } success = false; } } } ... } |
参数–execute后的sql语句可以传多个,默认使用“;”来分隔。StatementSplitter把多个sql语句拆分开来一个一个执行。
4. Console的process方法
1 2 3 4 5 6 7 8 9 |
private static boolean process(QueryRunner queryRunner, String sql, OutputFormat outputFormat, Runnable schemaChanged, boolean interactive) { ... try (Query query = queryRunner.startQuery(finalSql)) { boolean success = query.renderOutput(System.out, outputFormat, interactive); ... } ... } |
对于一次查询,是分为两步执行的:初始查询和后续循环查询,分别通过startQuery和renderOutput方法来完成的。初始查询的时候,不返回数据,只返回nextUri。后续查询都是使用上一个查询返回的nextUri来查询下一批数据的。
5. QueryRunner的startQuery方法 -> StatementClientV1的构造方法
1 2 3 4 5 6 7 8 |
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query) { ... Request request = buildQueryRequest(session, query); JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request); ... processResponse(response.getHeaders(), response.getValue()); } |
sql是通过httpClient发送到server端的。上面那个response里面没有数据,只有nextUri。
6. StatementClientV1的buildQueryRequest方法
1 2 3 4 5 6 7 |
private Request buildQueryRequest(ClientSession session, String query) { HttpUrl url = HttpUrl.get(session.getServer()); ... url = url.newBuilder().encodedPath("/v1/statement").build(); ... } |
getServer获取到的是参数–server后的值,/v1/statement是执行查询的路径,该路径只是在第一次提交的时候才会调用,后面都是调用nextUri。
7. Query的renderOutput方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public boolean renderOutput(PrintStream out, OutputFormat outputFormat, boolean interactive) { Thread clientThread = Thread.currentThread(); // 处理kill -2 SignalHandler oldHandler = Signal.handle(SIGINT, signal -> { if (ignoreUserInterrupt.get() || client.isClientAborted()) { return; } client.close(); clientThread.interrupt(); }); try { return renderQueryOutput(out, outputFormat, interactive); } ... } |
对信号量进行处理,可以推断出renderQueryOutput方法是个阻塞方法。
8. Query的renderQueryOutput方法
1 2 3 4 5 6 7 8 9 10 11 12 |
private boolean renderQueryOutput(PrintStream out, OutputFormat outputFormat, boolean interactive) { ... if (interactive) { statusPrinter = new StatusPrinter(client, out, debug); statusPrinter.printInitialStatusUpdates(); } else { processInitialStatusUpdates(warningsPrinter); } ... } |
interactive=true的时候,数据就在屏幕上持续输出,否则就等待结果。
9. StatusPrinter的printInitialStatusUpdates方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public void printInitialStatusUpdates() { long lastPrint = System.nanoTime(); try { WarningsPrinter warningsPrinter = new ConsoleWarningsPrinter(console); // #1 while (client.isRunning()) { try { // client还有未处理完毕的数据,直接退出循环 if (client.currentData().getData() != null) { return; } ... } ... client.advance(); } ... } ... } |
当client状态为Running的时候,client使用advance方法不断的向server请求数据。
10. StatementClientV1的advance方法
StatementClientV1是StatementClient接口的实现类
1 2 3 4 5 6 7 8 9 10 11 12 |
public boolean advance() { ... URI nextUri = currentStatusInfo().getNextUri(); if (nextUri == null) { // 对应9的#1,设置client状态为FINISHED state.compareAndSet(State.RUNNING, State.FINISHED); return false; } Request request = prepareRequest(HttpUrl.get(nextUri)).build(); ... } |
currentStatusInfo方法获取的是当前的请求结果,为了保证线程安全,结果保存在AtomicReference中。nextUri是由Coordinator生成,内容是/v1/statement/queued/{queryId}/{token}。通过以上的方式,实现了动态打印数据。
11. 流程图
图片来自《Presto技术内幕》
95 Replies to “presto源码阅读1:从cli提交任务说起”