thedeemon: (office)
[personal profile] thedeemon
Несколько дней назад закончилось соревнование ICFP Contest 2009, мой небольшой отчет об участии в котором можно почитать здесь. Там в задании была описана несложная виртуальная машина, на которой запускались данные организаторами программы для моделирования орбитальной механики. Тут я расскажу, как, используя возможности функционального языка программирования, можно заметно ускорить интерпретатор.

Update: Это НЕ попытка сравнить скорости С++ и Окамла. Это НЕ попытка описать самый быстрый подход к интерпретации. Это просто техническая часть отчета об ICFPC с рассказом о примененном мною там приеме.
ВМ имеет память из 16384 вещественных чисел и программу не большей длины. Программа состоит из последовательности команд, выполняющихся по порядку, без ветвлений. Команды бывают без аргументов, с одним или с двумя аргументами. Для большинства команд результат выполнения (вещественное число) записывается в ячейку памяти с номером, равным номеру команды в программе. Аргументы берутся из памяти, их адреса закодированы в одном двойном слове с командой. Есть одноаргументные операции сравнения с нулем, их результат (истина или ложь) сохраняется в неком регистре статуса, который потом используется в команде выбора, записывающей в свою ячейку содержимое либо первого аргумента, либо второго, в зависимости от текущего значения статуса.
С учетом способа кодирования инструкций и аргументов, интерпретатор этой ВМ на С++ выглядит так:

  void run()
  {
    for(int ip=0; ip<prglen; ip++)   {
      const int dop = prg[ip]>>28;
      const int dr1 = (prg[ip] >> 14) & 0x3FFF; 
      const int dr2 = prg[ip] & 0x3FFF;
      switch(dop) {
        case 1: mem[ip] = mem[dr1] + mem[dr2]; break;
        case 2: mem[ip] = mem[dr1] - mem[dr2]; break;
        case 3: mem[ip] = mem[dr1] * mem[dr2]; break;
        case 4: mem[ip] = (mem[dr2]==0.0) ? 0.0 : mem[dr1] / mem[dr2]; break;
        case 5: out[dr1] = mem[dr2]; break;
        case 6: mem[ip] = status ? mem[dr1] : mem[dr2]; break;
        case 0:  { 
            const int sop = prg[ip] >> 24;
            const int imm = (prg[ip] >> 21) & 7;
            const int sr1 = dr2;
            switch(sop) {          
              case 0: break;
              case 1: switch(imm) {
                    case 0: status = mem[sr1] < 0; break;
                    case 1: status = mem[sr1] <= 0; break;
                    case 2: status = mem[sr1] == 0; break; 
                    case 3: status = mem[sr1] >= 0; break; 
                    case 4: status = mem[sr1] > 0; break; 
                    default: printf("bad imm=%d ip=%d\n", imm, ip); return;
                  } break;
              case 2: mem[ip] = sqrt(mem[sr1]); break;
              case 3: mem[ip] = mem[sr1]; break;
              case 4: mem[ip] = inp[sr1]; break;
              default: printf("bad sop=%d ip=%d\n", sop, ip); return;
            }
            }
            break;
        default: printf("bad dop=%d ip=%d\n", dop, ip); return;
      } 
    }
  }

Такая реализация на моей рабочей машинке 2,33 ГГц выполняет первую программу из 266 команд 377 000 раз в секунду. Но при участии в ICPFC я все писал на OCaml'e. На нем систему команд ВМ я описал в виде алгебраического типа, расшифровку команд и аргументов вынес отдельно, и интерпретатор выглядел так:

let eval_instr ip vm = function
  | Add(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2)
  | Sub(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) -. vm.mem.(r2) 
  | Mul(r1,r2) -> vm.mem.(ip) <- vm.mem.(r1) *. vm.mem.(r2)
  | Div(r1,r2) -> let v2 = vm.mem.(r2) in vm.mem.(ip) <- if v2=0.0 then 0.0 else vm.mem.(r1) /. v2 
  | Out(r1,r2) -> vm.outports.(r1) <- vm.mem.(r2)
  | Phi(r1,r2) -> vm.mem.(ip) <- if vm.status then vm.mem.(r1) else vm.mem.(r2)
  | Nop -> ()
  | Cmpz(LTZ, r1) -> vm.status <- vm.mem.(r1) < 0.0
  | Cmpz(LEZ, r1) -> vm.status <- vm.mem.(r1) <= 0.0
  | Cmpz(EQZ, r1) -> vm.status <- vm.mem.(r1) = 0.0
  | Cmpz(GEZ, r1) -> vm.status <- vm.mem.(r1) >= 0.0
  | Cmpz(GTZ, r1) -> vm.status <- vm.mem.(r1) > 0.0
  | Sqrt r1 -> vm.mem.(ip) <- if vm.mem.(r1) >= 0.0 then sqrt vm.mem.(r1) else 0.0
  | Copy r1 -> vm.mem.(ip) <- vm.mem.(r1)
  | Inp r1 -> vm.mem.(ip) <- vm.inports.(r1);;

let interpret vm = 
  for ip = 0 to vm.prglen - 1 do
    eval_instr ip vm vm.prg.(ip)
  done;
  vm.time <- vm.time + 1;
  if vm.outports.(0)>0.0 && vm.scoretime=0 then vm.scoretime <- vm.time;;


Тут после собственно исполнения программы идет увеличение счетчика времени и проверка на выполнение задания, в варианте на С++ их нет. Такой интепретатор на Окамле наглядней, меньше подвержен ошибкам, и написался очень быстро, но работал несколько медленнее: только 263 000 итераций в секунду. Захотелось его ускорить, и были сделаны две оптимизации.

Оптимизация первая.
В исходном виде интерпретатора у нас в цикле крутится switch. Но от него можно избавиться, вынеся его за пределы цикла. Для этого мы каждую команду программы превратим в функцию. Она будет выполнять предписанное командой действие и передавать управление следующей такой функции. Вместо пробегания по программе в цикле мы будем вызывать функцию для первой команды, она - функцию для второй и т.д. При "компиляции" каждой команды нам нужно иметь уже "скомпилированную" функцию для следующей, поэтому эту псевдокомпиляцию будем проводить с конца программы. Самой последней команде в качестве такой функции-продолжения мы дадим функцию, выполняющую действия, которые нужно сделать в конце каждой итерации - увеличение счетчика времени и проверка очков. У ВМ есть особая инструкция Noop (у меня она обозначена более привычно Nop), которая ничего не делает. Для нее новой функции создаваться не будет, а просто полученное продолжение будет передаваться дальше. Благодаря этому все nop'ы и их последовательности будут вообще бесплатны. Отдельным образом будут "компилироваться" команды сравнения и выбора. Они всегда идут подряд парами - сначала сравнение аргумента с нулем Cmp*, затем выбор результата Phi. По сути это одна трехаргументная команда. Поэтому последовательность этих двух команд будет "компилироваться" в одну функцию. Для этого результатом "компиляции" Phi (которая в программе идет позже, поэтому "компилируется" раньше) будет не функция-продолжение, а структура с ее аргументами. При компиляции команды сравнения эта структура будет использоваться для создания функции, объединяющей в себе Cmp* и Phi.
Опишем тип результата псевдокомпиляции. Это или функция-продолжение, или аргументы Phi и ее продолжение:

type comp_res = Cont of (unit -> unit) | Phiargs of int * int * int * (unit -> unit);;   

Псевдокомпиляция команды ВМ в такой результат делается просто:

let comp_instr ip vm ins nxt = 
  match ins, nxt with
  | Add(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2); k ())    
  | Sub(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) -. vm.mem.(r2); k ())    
  | Mul(r1,r2), Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1) *. vm.mem.(r2); k ())    
  | Div(r1,r2), Cont k -> Cont(fun () -> 
      let v2 = vm.mem.(r2) in vm.mem.(ip) <- if v2=0.0 then 0.0 else vm.mem.(r1) /. v2; k ())  
  | Out(r1,r2), Cont k -> Cont(fun () -> vm.outports.(r1) <- vm.mem.(r2); k ())    
  | Phi(r1,r2), Cont k -> Phiargs(r1, r2, ip, k)
  | Nop, Cont k -> Cont k 
  | Cmpz(LTZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) < 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(LEZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) <= 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(EQZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) = 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(GEZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) >= 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Cmpz(GTZ, r1), Phiargs(p1,p2, pip, k) -> Cont(fun () -> 
      vm.mem.(pip) <- if vm.mem.(r1) > 0.0 then vm.mem.(p1) else vm.mem.(p2); k ())
  | Sqrt r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- if vm.mem.(r1) >= 0.0 then sqrt vm.mem.(r1) else 0.0; k ())
  | Copy r1, Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.mem.(r1); k ())
  | Inp r1,  Cont k -> Cont(fun () -> vm.mem.(ip) <- vm.inports.(r1); k ())
  | _, _ -> Printf.printf "compile error: ip=%d\n" ip; failwith "comp_instr";;  

А псевдокомпиляция всей программы будет выглядеть так:

let comp_prg vm = 
  let rec loop ip nxt = 
    if ip < 0 then nxt else
    let ci = comp_instr ip vm vm.prg.(ip) nxt in
    loop (ip-1) ci  in
  match loop (vm.prglen-1) (Cont(fun ()-> 
    vm.time <- vm.time + 1;  if vm.outports.(0)>0.0 && vm.scoretime=0 then vm.scoretime <- vm.time)) 
  with Cont k -> k  | Phiargs _ -> failwith "comp_prg: Phiargs returned";;  

Функция comp_prg получает на вход структуру ВМ, содержащую программу и память, а на выход выдает функцию, которая выполняет один проход программы данного экземпляра ВМ. Ее можно вызывать вместо interpret vm, и скорость получается уже 417 000 итераций в секунду, т.е. уже быстрее, чем реализация ВМ на С++. Но и это не предел.

Оптимизация вторая.
На каждой итерации ВМ читает какие-то данные из входных портов, производит вычисления, зависящие от них и от состояния памяти, пишет что-то в память и выдает что-то на выходные порты. В данном случае на входе были моментальные изменения скорости спутника, а на выходе его координаты, остаток топлива и другие данные. Большую часть времени спутник просто летит по орбите без использования двигателей, т.е. значения входных портов для большинства итераций одни и те же. Это можно использовать и убрать из цикла вычисления, зависящие от неизменяющихся данных, записав вместо них Nop'ы с уже посчитанным результатом. Алгоритм оптимизации очень прост. Заводим массив булевых значений, показывающих для каждой команды является ли ее результат константой при условии константности входов. Изначально это истина для команд Input и Nop и ложь для всех остальных. Дальше проходим по программе и для каждой операции кроме Output смотрим не являются ли константами ее аргументы. Если да, то вычисляем результат операции, записываем в нужную ячейку памяти, а команду заменяем на Nop, пометив ее как константу. Так делаем несколько раз: на каждом проходе число константных операций или увеличивается (когда заменили очередную команду) или не изменяется (когда больше уже нечего заменить). Если после прохода ничего не изменилось, значит мы попали в Fixed Point и оптимизация готова, возвращаем новый экземпляр ВМ с новой программой и новым содержимым памяти:

let optimize vm = 
  let prg = Array.sub vm.prg 0 (vm.prglen) in  
  let propagate cons mem p = 
    let con = Array.copy cons in
    let p' = p |> Array.mapi (fun ip ins ->    
      match ins with 
      | Add(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) +. mem.(r2); con.(ip)<-true; Nop) else ins 
      | Sub(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) -. mem.(r2); con.(ip)<-true; Nop) else ins
      | Mul(r1,r2) -> if con.(r1) && con.(r2) then (mem.(ip) <- mem.(r1) *. mem.(r2); con.(ip)<-true; Nop) else ins
      | Div(r1,r2) -> if con.(r1) && con.(r2) then 
                        (mem.(ip) <- if mem.(r2)=0.0 then 0.0 else mem.(r1) /. mem.(r2); con.(ip)<-true; Nop) else ins 
      | Out(r1,r2) -> ins
      | Phi(r1,r2) -> if ip>0 then begin
                        match prg.(ip-1) with 
                        | Cmpz _ ->  if con.(ip-1) then
                                      if mem.(ip-1) > 0.0 then Copy r1 else Copy r2
                                    else ins
                        | _ -> ins
                      end  else ins
      | Nop -> ins
      | Cmpz(LTZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) < 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(LEZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) <= 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(EQZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) = 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(GEZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) >= 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Cmpz(GTZ, r1) -> if con.(r1) then (mem.(ip) <- if mem.(r1) > 0.0 then 1.0 else 0.0; con.(ip)<-true; Nop) else ins
      | Sqrt r1 -> if con.(r1) then (mem.(ip) <- if mem.(r1) >= 0.0 then sqrt mem.(r1) else 0.0; con.(ip)<-true; Nop) else ins
      | Copy r1 -> if con.(r1) then (mem.(ip) <- mem.(r1); con.(ip)<-true; Nop) else ins
      | Inp r1 -> mem.(ip) <- vm.inports.(r1); con.(ip)<-true; Nop) in
    con, p'   in
  let mem = Array.copy vm.mem in
  let rec loop p con =
    let con', p' = propagate con mem p in
    if con' = con && p = p' then p, con else loop p' con'  in 
  let const = prg |> Array.map (function Nop | Inp _ -> true | _ -> false) in
  let p, con = loop prg const  in
  { prg = p; prglen = vm.prglen; mem = mem; 
    inports = Array.copy vm.inports; outports = Array.copy vm.outports; status = vm.status;
    time = vm.time; scenario = vm.scenario; log = vm.log; scoretime = vm.scoretime  };;      

Такая оптимизация на первой задаче, например, увеличивает количество пустых команд с 11% до 39%. Оптимизированная программа подвергается описанной выше псевдокомпиляции, и скорость возрастает до 650 000 итераций в секунду, что в 1,7 раза быстрее варианта на С++ и в 2,5 раза быстрее исходного интепретатора на Окамле. Если наш спутник меняет траекторию, то в этом момент используется неоптимизированный вариант, а после того, как он ляжет на новую орбиту с выключенным двигателем, текущий экземпляр ВМ проходит оптимизацию и опять получаем быстрый ваприант уже с новыми значениями.

Date: 2009-07-11 08:12 am (UTC)
From: [identity profile] thedeemon.livejournal.com
"fun () -> ..." это просто запись безымянной функции без параметров. Совсем без параметров в математике функций не бывает, поэтому в языке введен специальный тип unit (аналог void), значение этого типа обозначается (). Т.е. match'a здесь нет, т.к. параметр фиктивный, ничего на самом деле не передается.

Цепочек Cont там не получается. Разберем один шаг псевдокомпиляции на примере:
let comp_instr ip vm ins nxt =
match ins, nxt with
тут смотрим, чему равны текущая инструкция ins и переданный нам результат псевдокомпиляции остатка программы (т.к. идем с конца) nxt. Допустим, ins - инструкция сложения, а nxt - собранное продолжение в виде Cont k. nxt имеет тип comp_res и содержит тег Cont и данные k. k имеет тип "функция из воида в воид" (unit -> unit).

| Add(r1,r2), Cont k ->
попадаем в эту ветку, заодно получив значения r1, r2 и k, отбросив теги Add и Cont.
Создаем новую функцию, которая не имеет параметров (т.е. имеет один фиктивный - () ) производит нужное действие (сложение и запись) и вызывает k:
fun () -> vm.mem.(ip) <- vm.mem.(r1) +. vm.mem.(r2); k ()
Здесь ";" - разделитель между двумя последовательными действиями (как в С, только в конце он не ставится, т.к. он именно разделитель, а не завершитель). Функция эта возвращает то, что вернет k, и т.к. это последнее действие в ней, то вместо call будет jump. Но тип k известен, она ничего не возвращает, т.е. возвращает все то же фиктивное (). Тип построенной безымянной функции в результате тоже unit->unit и ее мы в качестве данных оборачиваем с тегом Cont, получив значение типа comp_res, которое и возвращаем. При псевдокомпиляции следующей команды этот тег Cont будет отброшен, а использована только сама построенная только что функция, она станет k.

Исходник ВМ, который использовался в соревновании:
с подсветкой http://stuff.thedeemon.com/orbitvm.html
оригинал http://stuff.thedeemon.com/orbitvm.ml

Компилял версией OCaml 3.10.2 под Windows/MSVC.

Date: 2009-07-11 10:15 am (UTC)
From: [identity profile] snaury.livejournal.com
А! Так вот где я ошибся. Я почему-то подумал, что unit - это сокращение от чего-то вроде unit of execution, и думал что unit -> unit - это функция, которая на вход получает функцию и возвращает функцию, а () - это пустая функция. %)

Теперь всё стало понятно, спасибо...

Profile

thedeemon: (Default)
Dmitry Popov

December 2025

S M T W T F S
 12 3456
789101112 13
14151617181920
21222324252627
28293031   

Most Popular Tags

Style Credit

Expand Cut Tags

No cut tags
Page generated Jan. 29th, 2026 12:37 pm
Powered by Dreamwidth Studios