ICFPC'09 и ускорение интерпретатора
Jul. 4th, 2009 10:19 pmНесколько дней назад закончилось соревнование ICFP Contest 2009, мой небольшой отчет об участии в котором можно почитать здесь. Там в задании была описана несложная виртуальная машина, на которой запускались данные организаторами программы для моделирования орбитальной механики. Тут я расскажу, как, используя возможности функционального языка программирования, можно заметно ускорить интерпретатор.
Update: Это НЕ попытка сравнить скорости С++ и Окамла. Это НЕ попытка описать самый быстрый подход к интерпретации. Это просто техническая часть отчета об ICFPC с рассказом о примененном мною там приеме.
ВМ имеет память из 16384 вещественных чисел и программу не большей длины. Программа состоит из последовательности команд, выполняющихся по порядку, без ветвлений. Команды бывают без аргументов, с одним или с двумя аргументами. Для большинства команд результат выполнения (вещественное число) записывается в ячейку памяти с номером, равным номеру команды в программе. Аргументы берутся из памяти, их адреса закодированы в одном двойном слове с командой. Есть одноаргументные операции сравнения с нулем, их результат (истина или ложь) сохраняется в неком регистре статуса, который потом используется в команде выбора, записывающей в свою ячейку содержимое либо первого аргумента, либо второго, в зависимости от текущего значения статуса.
С учетом способа кодирования инструкций и аргументов, интерпретатор этой ВМ на С++ выглядит так:
Такая реализация на моей рабочей машинке 2,33 ГГц выполняет первую программу из 266 команд 377 000 раз в секунду. Но при участии в ICPFC я все писал на OCaml'e. На нем систему команд ВМ я описал в виде алгебраического типа, расшифровку команд и аргументов вынес отдельно, и интерпретатор выглядел так:
Тут после собственно исполнения программы идет увеличение счетчика времени и проверка на выполнение задания, в варианте на С++ их нет. Такой интепретатор на Окамле наглядней, меньше подвержен ошибкам, и написался очень быстро, но работал несколько медленнее: только 263 000 итераций в секунду. Захотелось его ускорить, и были сделаны две оптимизации.
Оптимизация первая.
В исходном виде интерпретатора у нас в цикле крутится switch. Но от него можно избавиться, вынеся его за пределы цикла. Для этого мы каждую команду программы превратим в функцию. Она будет выполнять предписанное командой действие и передавать управление следующей такой функции. Вместо пробегания по программе в цикле мы будем вызывать функцию для первой команды, она - функцию для второй и т.д. При "компиляции" каждой команды нам нужно иметь уже "скомпилированную" функцию для следующей, поэтому эту псевдокомпиляцию будем проводить с конца программы. Самой последней команде в качестве такой функции-продолжения мы дадим функцию, выполняющую действия, которые нужно сделать в конце каждой итерации - увеличение счетчика времени и проверка очков. У ВМ есть особая инструкция Noop (у меня она обозначена более привычно Nop), которая ничего не делает. Для нее новой функции создаваться не будет, а просто полученное продолжение будет передаваться дальше. Благодаря этому все nop'ы и их последовательности будут вообще бесплатны. Отдельным образом будут "компилироваться" команды сравнения и выбора. Они всегда идут подряд парами - сначала сравнение аргумента с нулем Cmp*, затем выбор результата Phi. По сути это одна трехаргументная команда. Поэтому последовательность этих двух команд будет "компилироваться" в одну функцию. Для этого результатом "компиляции" Phi (которая в программе идет позже, поэтому "компилируется" раньше) будет не функция-продолжение, а структура с ее аргументами. При компиляции команды сравнения эта структура будет использоваться для создания функции, объединяющей в себе Cmp* и Phi.
Опишем тип результата псевдокомпиляции. Это или функция-продолжение, или аргументы Phi и ее продолжение:
Псевдокомпиляция команды ВМ в такой результат делается просто:
А псевдокомпиляция всей программы будет выглядеть так:
Функция comp_prg получает на вход структуру ВМ, содержащую программу и память, а на выход выдает функцию, которая выполняет один проход программы данного экземпляра ВМ. Ее можно вызывать вместо interpret vm, и скорость получается уже 417 000 итераций в секунду, т.е. уже быстрее, чем реализация ВМ на С++. Но и это не предел.
Оптимизация вторая.
На каждой итерации ВМ читает какие-то данные из входных портов, производит вычисления, зависящие от них и от состояния памяти, пишет что-то в память и выдает что-то на выходные порты. В данном случае на входе были моментальные изменения скорости спутника, а на выходе его координаты, остаток топлива и другие данные. Большую часть времени спутник просто летит по орбите без использования двигателей, т.е. значения входных портов для большинства итераций одни и те же. Это можно использовать и убрать из цикла вычисления, зависящие от неизменяющихся данных, записав вместо них Nop'ы с уже посчитанным результатом. Алгоритм оптимизации очень прост. Заводим массив булевых значений, показывающих для каждой команды является ли ее результат константой при условии константности входов. Изначально это истина для команд Input и Nop и ложь для всех остальных. Дальше проходим по программе и для каждой операции кроме Output смотрим не являются ли константами ее аргументы. Если да, то вычисляем результат операции, записываем в нужную ячейку памяти, а команду заменяем на Nop, пометив ее как константу. Так делаем несколько раз: на каждом проходе число константных операций или увеличивается (когда заменили очередную команду) или не изменяется (когда больше уже нечего заменить). Если после прохода ничего не изменилось, значит мы попали в Fixed Point и оптимизация готова, возвращаем новый экземпляр ВМ с новой программой и новым содержимым памяти:
Такая оптимизация на первой задаче, например, увеличивает количество пустых команд с 11% до 39%. Оптимизированная программа подвергается описанной выше псевдокомпиляции, и скорость возрастает до 650 000 итераций в секунду, что в 1,7 раза быстрее варианта на С++ и в 2,5 раза быстрее исходного интепретатора на Окамле. Если наш спутник меняет траекторию, то в этом момент используется неоптимизированный вариант, а после того, как он ляжет на новую орбиту с выключенным двигателем, текущий экземпляр ВМ проходит оптимизацию и опять получаем быстрый ваприант уже с новыми значениями.
Update: Это НЕ попытка сравнить скорости С++ и Окамла. Это НЕ попытка описать самый быстрый подход к интерпретации. Это просто техническая часть отчета об ICFPC с рассказом о примененном мною там приеме.
ВМ имеет память из 16384 вещественных чисел и программу не большей длины. Программа состоит из последовательности команд, выполняющихся по порядку, без ветвлений. Команды бывают без аргументов, с одним или с двумя аргументами. Для большинства команд результат выполнения (вещественное число) записывается в ячейку памяти с номером, равным номеру команды в программе. Аргументы берутся из памяти, их адреса закодированы в одном двойном слове с командой. Есть одноаргументные операции сравнения с нулем, их результат (истина или ложь) сохраняется в неком регистре статуса, который потом используется в команде выбора, записывающей в свою ячейку содержимое либо первого аргумента, либо второго, в зависимости от текущего значения статуса.
С учетом способа кодирования инструкций и аргументов, интерпретатор этой ВМ на С++ выглядит так:
void run()
{
for(int ip=0; ip<prglen; ip++) {
const int dop = prg[ip]>>28;
const int dr1 = (prg[ip] >> 14) & 0x3FFF;
const int dr2 = prg[ip] & 0x3FFF;
switch(dop) {
case 1: mem[ip] = mem[dr1] + mem[dr2]; break;
case 2: mem[ip] = mem[dr1] - mem[dr2]; break;
case 3: mem[ip] = mem[dr1] * mem[dr2]; break;
case 4: mem[ip] = (mem[dr2]==0.0) ? 0.0 : mem[dr1] / mem[dr2]; break;
case 5: out[dr1] = mem[dr2]; break;
case 6: mem[ip] = status ? mem[dr1] : mem[dr2]; break;
case 0: {
const int sop = prg[ip] >> 24;
const int imm = (prg[ip] >> 21) & 7;
const int sr1 = dr2;
switch(sop) {
case 0: break;
case 1: switch(imm) {
case 0: status = mem[sr1] < 0; break;
case 1: status = mem[sr1] <= 0; break;
case 2: status = mem[sr1] == 0; break;
case 3: status = mem[sr1] >= 0; break;
case 4: status = mem[sr1] > 0; break;
default: printf("bad imm=%d ip=%d\n", imm, ip); return;
} break;
case 2: mem[ip] = sqrt(mem[sr1]); break;
case 3: mem[ip] = mem[sr1]; break;
case 4: mem[ip] = inp[sr1]; break;
default: printf("bad sop=%d ip=%d\n", sop, ip); return;
}
}
break;
default: printf("bad dop=%d ip=%d\n", dop, ip); return;
}
}
}
Такая реализация на моей рабочей машинке 2,33 ГГц выполняет первую программу из 266 команд 377 000 раз в секунду. Но при участии в ICPFC я все писал на OCaml'e. На нем систему команд ВМ я описал в виде алгебраического типа, расшифровку команд и аргументов вынес отдельно, и интерпретатор выглядел так:
let eval_instr ip vm = function | Add(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2) | Sub(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) -. vm.mem.(r2) | Mul(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) *. vm.mem.(r2) | Div(r1,r2) -> let v2 = vm.mem.(r2) in vm.mem.(ip) <- if v2=0.0 then 0.0 else vm.mem.(r1) /. v2 | Out(r1,r2) -> vm.outports.(r1) <- vm.mem.(r2) | Phi(r1,r2) -> vm.mem.(ip) <- if vm.status then vm.mem.(r1) else vm.mem.(r2) | Nop -> () | Cmpz(LTZ, r1) -> vm.status <- vm.mem.(r1) < 0.0 | Cmpz(LEZ, r1) -> vm.status <- vm.mem.(r1) <= 0.0 | Cmpz(EQZ, r1) -> vm.status <- vm.mem.(r1) = 0.0 | Cmpz(GEZ, r1) -> vm.status <- vm.mem.(r1) >= 0.0 | Cmpz(GTZ, r1) -> vm.status <- vm.mem.(r1) > 0.0 | Sqrt r1 -> vm.mem.(ip) <- if vm.mem.(r1) >= 0.0 then sqrt vm.mem.(r1) else 0.0 | Copy r1 -> vm.mem.(ip) <- vm.mem.(r1) | Inp r1 -> vm.mem.(ip) <- vm.inports.(r1);; let interpret vm = for ip = 0 to vm.prglen - 1 do eval_instr ip vm vm.prg.(ip) done; vm.time <- vm.time + 1; if vm.outports.(0)>0.0 && vm.scoretime=0 then vm.scoretime <- vm.time;;
Тут после собственно исполнения программы идет увеличение счетчика времени и проверка на выполнение задания, в варианте на С++ их нет. Такой интепретатор на Окамле наглядней, меньше подвержен ошибкам, и написался очень быстро, но работал несколько медленнее: только 263 000 итераций в секунду. Захотелось его ускорить, и были сделаны две оптимизации.
Оптимизация первая.
В исходном виде интерпретатора у нас в цикле крутится switch. Но от него можно избавиться, вынеся его за пределы цикла. Для этого мы каждую команду программы превратим в функцию. Она будет выполнять предписанное командой действие и передавать управление следующей такой функции. Вместо пробегания по программе в цикле мы будем вызывать функцию для первой команды, она - функцию для второй и т.д. При "компиляции" каждой команды нам нужно иметь уже "скомпилированную" функцию для следующей, поэтому эту псевдокомпиляцию будем проводить с конца программы. Самой последней команде в качестве такой функции-продолжения мы дадим функцию, выполняющую действия, которые нужно сделать в конце каждой итерации - увеличение счетчика времени и проверка очков. У ВМ есть особая инструкция Noop (у меня она обозначена более привычно Nop), которая ничего не делает. Для нее новой функции создаваться не будет, а просто полученное продолжение будет передаваться дальше. Благодаря этому все nop'ы и их последовательности будут вообще бесплатны. Отдельным образом будут "компилироваться" команды сравнения и выбора. Они всегда идут подряд парами - сначала сравнение аргумента с нулем Cmp*, затем выбор результата Phi. По сути это одна трехаргументная команда. Поэтому последовательность этих двух команд будет "компилироваться" в одну функцию. Для этого результатом "компиляции" Phi (которая в программе идет позже, поэтому "компилируется" раньше) будет не функция-продолжение, а структура с ее аргументами. При компиляции команды сравнения эта структура будет использоваться для создания функции, объединяющей в себе Cmp* и Phi.
Опишем тип результата псевдокомпиляции. Это или функция-продолжение, или аргументы Phi и ее продолжение:
type comp_res = Cont of (unit -> unit) | Phiargs of int * int * int * (unit -> unit);;
Псевдокомпиляция команды ВМ в такой результат делается просто:
let comp_instr ip vm ins nxt = match ins, nxt with | Add(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2); k ()) | Sub(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) -. vm.mem.(r2); k ()) | Mul(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) *. vm.mem.(r2); k ()) | Div(r1,r2), Cont k -> Cont(fun () -> let v2 = vm.mem.(r2) in vm.mem.(ip) <- if v2=0.0 then 0.0 else vm.mem.(r1) /. v2; k ()) | Out(r1,r2), Cont k -> Cont(fun () -> vm.outports.(r1) <- vm.mem.(r2); k ()) | Phi(r1,r2), Cont k -> Phiargs(r1, r2, ip, k) | Nop, Cont k -> Cont k | Cmpz(LTZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> vm.mem.(pip) <- if vm.mem.(r1) < 0.0 then vm.mem.(p1) else vm.mem.(p2); k ()) | Cmpz(LEZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> vm.mem.(pip) <- if vm.mem.(r1) <= 0.0 then vm.mem.(p1) else vm.mem.(p2); k ()) | Cmpz(EQZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> vm.mem.(pip) <- if vm.mem.(r1) = 0.0 then vm.mem.(p1) else vm.mem.(p2); k ()) | Cmpz(GEZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> vm.mem.(pip) <- if vm.mem.(r1) >= 0.0 then vm.mem.(p1) else vm.mem.(p2); k ()) | Cmpz(GTZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> vm.mem.(pip) <- if vm.mem.(r1) > 0.0 then vm.mem.(p1) else vm.mem.(p2); k ()) | Sqrt r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- if vm.mem.(r1) >= 0.0 then sqrt vm.mem.(r1) else 0.0; k ()) | Copy r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1); k ()) | Inp r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.inports.(r1); k ()) | _, _ -> Printf.printf "compile error: ip=%d\n" ip; failwith "comp_instr";;
А псевдокомпиляция всей программы будет выглядеть так:
let comp_prg vm = let rec loop ip nxt = if ip < 0 then nxt else let ci = comp_instr ip vm vm.prg.(ip) nxt in loop (ip-1) ci in match loop (vm.prglen-1) (Cont(fun ()-> vm.time <- vm.time + 1; if vm.outports.(0)>0.0 && vm.scoretime=0 then vm.scoretime <- vm.time)) with Cont k -> k | Phiargs _ -> failwith "comp_prg: Phiargs returned";;
Функция comp_prg получает на вход структуру ВМ, содержащую программу и память, а на выход выдает функцию, которая выполняет один проход программы данного экземпляра ВМ. Ее можно вызывать вместо interpret vm, и скорость получается уже 417 000 итераций в секунду, т.е. уже быстрее, чем реализация ВМ на С++. Но и это не предел.
Оптимизация вторая.
На каждой итерации ВМ читает какие-то данные из входных портов, производит вычисления, зависящие от них и от состояния памяти, пишет что-то в память и выдает что-то на выходные порты. В данном случае на входе были моментальные изменения скорости спутника, а на выходе его координаты, остаток топлива и другие данные. Большую часть времени спутник просто летит по орбите без использования двигателей, т.е. значения входных портов для большинства итераций одни и те же. Это можно использовать и убрать из цикла вычисления, зависящие от неизменяющихся данных, записав вместо них Nop'ы с уже посчитанным результатом. Алгоритм оптимизации очень прост. Заводим массив булевых значений, показывающих для каждой команды является ли ее результат константой при условии константности входов. Изначально это истина для команд Input и Nop и ложь для всех остальных. Дальше проходим по программе и для каждой операции кроме Output смотрим не являются ли константами ее аргументы. Если да, то вычисляем результат операции, записываем в нужную ячейку памяти, а команду заменяем на Nop, пометив ее как константу. Так делаем несколько раз: на каждом проходе число константных операций или увеличивается (когда заменили очередную команду) или не изменяется (когда больше уже нечего заменить). Если после прохода ничего не изменилось, значит мы попали в Fixed Point и оптимизация готова, возвращаем новый экземпляр ВМ с новой программой и новым содержимым памяти:
let optimize vm = let prg = Array.sub vm.prg 0 (vm.prglen) in let propagate cons mem p = let con = Array.copy cons in let p' = p |> Array.mapi (fun ip ins -> match ins with | Add(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) +. mem.(r2); con.(ip)<-true; Nop) else ins | Sub(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) -. mem.(r2); con.(ip)<-true; Nop) else ins | Mul(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) *. mem.(r2); con.(ip)<-true; Nop) else ins | Div(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- if mem.(r2)=0.0 then 0.0 else mem.(r1) /. mem.(r2); con.(ip)<-true; Nop) else ins | Out(r1,r2) -> ins | Phi(r1,r2) -> if ip>0 then begin match prg.(ip-1) with | Cmpz _ -> if con.(ip-1) then if mem.(ip-1) > 0.0 then Copy r1 else Copy r2 else ins | _ -> ins end else ins | Nop -> ins | Cmpz(LTZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) < 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins | Cmpz(LEZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) <= 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins | Cmpz(EQZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) = 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins | Cmpz(GEZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) >= 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins | Cmpz(GTZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) > 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins | Sqrt r1 -> if con.(r1) then (mem.(ip) <- if mem.(r1) >= 0.0 then sqrt mem.(r1) else 0.0; con.(ip)<-true; Nop) else ins | Copy r1 -> if con.(r1) then (mem.(ip) <- mem.(r1); con.(ip)<-true; Nop) else ins | Inp r1 -> mem.(ip) <- vm.inports.(r1); con.(ip)<-true; Nop) in con, p' in let mem = Array.copy vm.mem in let rec loop p con = let con', p' = propagate con mem p in if con' = con && p = p' then p, con else loop p' con' in let const = prg |> Array.map (function Nop | Inp _ -> true | _ -> false) in let p, con = loop prg const in { prg = p; prglen = vm.prglen; mem = mem; inports = Array.copy vm.inports; outports = Array.copy vm.outports; status = vm.status; time = vm.time; scenario = vm.scenario; log = vm.log; scoretime = vm.scoretime };;
Такая оптимизация на первой задаче, например, увеличивает количество пустых команд с 11% до 39%. Оптимизированная программа подвергается описанной выше псевдокомпиляции, и скорость возрастает до 650 000 итераций в секунду, что в 1,7 раза быстрее варианта на С++ и в 2,5 раза быстрее исходного интепретатора на Окамле. Если наш спутник меняет траекторию, то в этом момент используется неоптимизированный вариант, а после того, как он ляжет на новую орбиту с выключенным двигателем, текущий экземпляр ВМ проходит оптимизацию и опять получаем быстрый ваприант уже с новыми значениями.
no subject
Date: 2009-07-07 10:39 am (UTC)Делается проход, который создает список объектов-команд с виртуальным методом Execute(), под которые создаются конкретные классы-команды.
Получаемый выигрыш -- один косвенный вызов vs два свича + битовые операции.