From 71417877284e30757391eb7f3b64b5bee7c8bcb6 Mon Sep 17 00:00:00 2001 From: Vincent Riquer Date: Wed, 3 Apr 2013 16:15:56 +0200 Subject: [PATCH] get rid of pipes --- atom | 618 +++++++++++++++++++++++++---------------------------------- 1 file changed, 256 insertions(+), 362 deletions(-) diff --git a/atom b/atom index eec547f..3c34fe0 100755 --- a/atom +++ b/atom @@ -1269,294 +1269,243 @@ encodeFile::vorbis() { } worker() { + debug=2 trap "kill -USR1 $masterpid" EXIT trap - USR1 ALRM PIPE - exec 2>>"$tempdir/errors.log" - while : - do - echo work - read -t 10 line - until [ -n "$line" ] && [[ $line != AtOM:ComFail ]] - do - echo work - read -t 10 line - done - if [[ $line == AtOM:Die ]] - then - trap EXIT - break - elif [[ $line == AtOM:Sleep ]] - then - sleep 1 - continue - fi - taskid=${line%%|*} - rest="${line#*|}|" - sourcefileid=${rest%%|*} - rest=${rest#*|} - required=${rest%%|*} - rest=${rest#*|} - cmd_arg=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cmd_arg+=("${rest%%|*}") - rest=${rest#*|} - cleanup=${rest%%|*} - rest=${rest#*|} - destfileid=${rest%%|*} - rest=${rest#*|} - destfilename=${rest%%|*} - rest=${rest#*|} - for key in ${!cmd_arg[@]} - do - [ -z "${cmd_arg[key]}" ] && unset cmd_arg[key] - done - (( debug >= 2 )) && echo "${cmd_arg[@]}" >&2 - if "${cmd_arg[@]}" >/dev/null - then - echo "finished $taskid|$sourcefileid|$destfileid|$destfilename" - read -t 10 line - until [[ $line == AtOM:OK ]] - do - echo "finished $taskid|$sourcefileid|$destfileid|$destfilename" - read -t 10 line - done - else - echo "failed $taskid" - read -t 10 line - until [[ $line == AtOM:OK ]] - do - echo "failed $taskid" - read -t 10 line - done - [ -n "$filename" ] \ - && eval rm -f $filename - fi - unset cmd_arg - if [ -n "$cleanup" -a -n "$required" ] - then - echo "cleanup $required" - read -t 10 answer - until [[ $answer != AtOM:ComFail ]] - do - echo "cleanup $required" - read -t 10 answer - done - if (( answer == 1 )) - then - eval rm -f $cleanup - fi - fi - done - return + exec 2>>"$tempdir/worker$1.log" + (( debug >= 2 )) && echo "${cmd_arg[@]}" >&2 + "${cmd_arg[@]}" >/dev/null + status=$? + kill -USR1 $masterpid + exit $status } master() { - for workerid in ${!workers[@]} - do - if read -t0.001 -u$((200+workerid)) workercommand workerquery + if (( active >= concurrency)) || [ -n "$quit" ] + then + sleep 0.1 + else + echo ' + SELECT COUNT(*) + FROM tasks + WHERE status = 0; + + SELECT COUNT(*) + FROM tasks + WHERE status = 0 + AND requires is NULL; + + SELECT + id, + source_file, + required, + cmd_arg0, + cmd_arg1, + cmd_arg2, + cmd_arg3, + cmd_arg4, + cmd_arg5, + cmd_arg6, + cmd_arg7, + cmd_arg8, + cmd_arg9, + cmd_arg10, + cmd_arg11, + cmd_arg12, + cmd_arg13, + cmd_arg14, + cmd_arg15, + cmd_arg16, + cmd_arg17, + cmd_arg18, + cmd_arg19, + cmd_arg20, + cmd_arg21, + cmd_arg22, + cmd_arg23, + cmd_arg24, + cmd_arg25, + cmd_arg26, + cmd_arg27, + cmd_arg28, + cmd_arg29, + cleanup, + fileid, + filename + FROM tasks + WHERE status = 0 + AND requires is NULL + ORDER BY source_file + LIMIT 1; + ' >&3 + read -u4 remaining + read -u4 ready + if (( remaining == 0 )) then - break + sleep 0.1 + continue + elif (( ready == 0 )) + then + sleep 0.1 + else + (( ++active )) + read -u4 line + taskid=${line%%|*} + rest="${line#*|}|" + sourcefileid=${rest%%|*} + rest=${rest#*|} + required=${rest%%|*} + rest=${rest#*|} + cmd_arg=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cmd_arg+=("${rest%%|*}") + rest=${rest#*|} + cleanup=${rest%%|*} + rest=${rest#*|} + destfileid=${rest%%|*} + rest=${rest#*|} + destfilename=${rest%%|*} + rest=${rest#*|} + for key in ${!cmd_arg[@]} + do + [ -z "${cmd_arg[key]}" ] && unset cmd_arg[key] + done + workerid=$(getworkerid) + workertasks[workerid]=$taskid + Update tasks status 1 <<<"id = $taskid" + export cmd_arg + createworker $workerid + unset cmd_arg + fi + fi +} + +getworkerid() { + local i + for (( i=0 ; i >= 0 ; i++ )) + do + if [ -z "${workers[i]}" ] + then + echo $i + return fi done - (( ${#workers[@]} == 0 )) && break - if [ -n "$workercommand" ] - then - case $workercommand in - 'work') - if [ -n "$quit" ] - then - destroyworker $workerid - elif [ -n "${workertasks[workerid]}" ] - then - echo ' - SELECT - id, - source_file, - required, - cmd_arg0, - cmd_arg1, - cmd_arg2, - cmd_arg3, - cmd_arg4, - cmd_arg5, - cmd_arg6, - cmd_arg7, - cmd_arg8, - cmd_arg9, - cmd_arg10, - cmd_arg11, - cmd_arg12, - cmd_arg13, - cmd_arg14, - cmd_arg15, - cmd_arg16, - cmd_arg17, - cmd_arg18, - cmd_arg19, - cmd_arg20, - cmd_arg21, - cmd_arg22, - cmd_arg23, - cmd_arg24, - cmd_arg25, - cmd_arg26, - cmd_arg27, - cmd_arg28, - cmd_arg29, - cleanup, - fileid, - filename - FROM tasks - WHERE - id='${workertasks[workerid]}'; - ' >&3 - read -u4 line - eval echo '"$line" >&'$((100+workerid)) - elif (( active < concurrency )) - then - echo ' - SELECT COUNT(*) - FROM tasks - WHERE status = 0; + # If we reach this, we have reached the signed long limit + # (2^63 - 1 = 9223372036854775807 - Got a supercomputer?) + (( concurrency-- )) +} - SELECT COUNT(*) - FROM tasks - WHERE status = 0 - AND requires is NULL; +createworker() { + worker & + workers[$1]=$! +} - SELECT - id, - source_file, - required, - cmd_arg0, - cmd_arg1, - cmd_arg2, - cmd_arg3, - cmd_arg4, - cmd_arg5, - cmd_arg6, - cmd_arg7, - cmd_arg8, - cmd_arg9, - cmd_arg10, - cmd_arg11, - cmd_arg12, - cmd_arg13, - cmd_arg14, - cmd_arg15, - cmd_arg16, - cmd_arg17, - cmd_arg18, - cmd_arg19, - cmd_arg20, - cmd_arg21, - cmd_arg22, - cmd_arg23, - cmd_arg24, - cmd_arg25, - cmd_arg26, - cmd_arg27, - cmd_arg28, - cmd_arg29, - cleanup, - fileid, - filename - FROM tasks - WHERE status = 0 - AND requires is NULL - ORDER BY source_file - LIMIT 1; - ' >&3 - read -u4 remaining - read -u4 ready - if (( remaining == 0 )) - then - destroyworker $workerid - continue - elif (( ready == 0 )) - then - line=AtOM:Sleep - else - (( ++active )) - read -u4 line - taskid=${line%%|*} - workertasks[workerid]=$taskid - Update tasks status 1 <<<"id = $taskid" - fi - eval echo '"$line" >&'$((100+workerid)) - else - destroyworker $workerid - fi - ;; - 'finished') - eval 'echo AtOM:OK >&'$((workerid+100)) - [ -z "${workertasks[workerid]}" ] && continue +destroyworker() { + dyingworker=${workers[$1]} + unset workers[$1] + wait $dyingworker +} + +checkworkers() { + for key in ${!workers[@]} + do + if ! kill -0 ${workers[key]} 2>/dev/null + then + if read -u4 -t 0.01 alienquery + then + alienresults=(${alienquery%%|*}) + rest="${alienquery#*|}|" + while [ -n "$rest" ] + do + alienresults+=("${rest%%|*}") + rest=${rest#*|} + done + fi + echo ' + SELECT + id, + source_file, + required, + cleanup, + fileid, + filename + FROM tasks + WHERE + id='${workertasks[key]}'; + ' >&3 + read -u4 line + taskid=${line%%|*} + rest="${line#*|}|" + sourcefileid=${rest%%|*} + rest=${rest#*|} + required=${rest%%|*} + rest=${rest#*|} + cleanup=${rest%%|*} + rest=${rest#*|} + destfileid=${rest%%|*} + rest=${rest#*|} + destfilename=${rest%%|*} + rest=${rest#*|} + if destroyworker $key + then (( ++ran )) - unset workertasks[workerid] (( active-- )) || true - taskid=${workerquery%%|*} - rest="${workerquery#*|}|" - sourcefileid=${rest%%|*} - rest=${rest#*|} - destfileid=${rest%%|*} - rest=${rest#*|} - destfilename=${rest%%|*} Delete tasks <<<"id = $taskid" if [ -n "$destfilename" ] then @@ -1581,15 +1530,7 @@ master() { "WHERE id=$destfileid;" \ >&3 fi - ;; - 'failed') - eval 'echo AtOM:OK >&'$((workerid+100)) - [ -z "${workertasks[workerid]}" ] && continue - unset workertasks[workerid] - (( --active )) || true - (( ++failed )) - (( ++ran )) - taskid=$workerquery + else faildepends=$( Select tasks 'COUNT(*)' <<-EOWhere requires = $taskid @@ -1598,79 +1539,33 @@ master() { (( failed+=faildepends )) Update tasks status 2 <<<"id = $taskid" Update tasks status 2 <<<"requires = $taskid" - ;; - 'cleanup') - required=$workerquery - echo "SELECT COUNT(*) - FROM tasks - WHERE ( status = 0 OR status = 1 ) - AND required = $required;">&3 - read -u4 count - if (( count == 0 )) - then - eval echo 1 '>&'$((100+workerid)) - else - eval echo 0 '>&'$((100+workerid)) - fi - ;; - *) - eval 'echo "AtOM:ComFail" >&'$((100+workerid)) - ;; - esac - fi -} - -getworkerid() { - local i - for (( i=0 ; i < 100 ; i++ )) - do - if [ -z "${workers[i]}" ] - then - echo $i - break - fi - done - # If we reach this, we have reached the hardcoded 100 workers limit - (( concurrency-- )) -} - -createworker() { - mkfifo "$tempdir"/worker$1{in,out} - worker $1 <"$tempdir"/worker$1in >"$tempdir"/worker$1out & - workers[$1]=$! - eval exec $((100+$1))'>"$tempdir"/worker$1in' - eval exec $((200+$1))'<"$tempdir"/worker$1out' -} - -destroyworker() { - dyingworker=${workers[$1]} - unset workers[$1] - [ -z "$2" ] && eval echo AtOM:Die '>&'$((100+$1)) - wait $dyingworker - eval $((100+$1))'>&-' - eval $((200+$1))'<&-' - rm "$tempdir"/worker$1{in,out} -} - -checkworkers() { - for key in ${!workers[@]} - do - if ! kill -0 ${workers[key]} 2>/dev/null - then - destroyworker $key nokill - if [ -n "${workertasks[key]}" ] - then - faildepends=$( - Select tasks 'COUNT(*)' <<-EOWhere - requires = ${workertasks[key]} - EOWhere - ) - (( ++failed )) - (( failed+=faildepends )) - Update tasks status 2 <<<"id = ${workertasks[key]}" - Update tasks status 2 <<<"requires = ${workertasks[key]}" fi - createworker $key + echo "SELECT COUNT(*) + FROM tasks + WHERE ( status = 0 OR status = 1 ) + AND required = $taskid;">&3 + read -u4 count + if (( count == 0 )) + then + rm -f "$cleanup" + fi + if (( ${#alienresults[@]} )) + then + alienquery='SELECT ' + for key in ${!alienresults[@]} + do + (( key > 0 )) && alienquery+=, + expr='^[0-9]*$' + if [[ ${alienresults[key]} =~ $expr ]] + then + alienquery+=${alienresults[key]} + else + alienquery+="\"${alienresults[@]//\"/\"\"}\"" + fi + done + echo "$alienquery;" >&3 + unset alienquery alienresults + fi fi done } @@ -2119,21 +2014,21 @@ echo -e "\rCreated ${count:-0} tasks for $filecount files (${copies:-0} immediat masterpid=$$ trap checkworkers USR1 ALRM PIPE -rm -f "$tempdir"/worker* concurrency=$(( maxload / 2 )) (( concurrency )) || concurrency=1 active=0 +#set -x for (( i=0 ; i < concurrency ; i++ )) do - createworker $(getworkerid) + master done concurrencychange=$(date +%s) starttime=$concurrencychange taskcount=$count failed=0 -while : +while (( ${#workers[@]} )) do - if read -n 1 -t 0.01 userinput + if read -n 1 -t 1 userinput then case $userinput in '+') @@ -2160,7 +2055,6 @@ do then concurrencychange=$(date +%s) (( ++concurrency )) - createworker $(getworkerid) fi fi master