get rid of pipes

This commit is contained in:
Vincent Riquer 2013-04-03 16:15:56 +02:00
parent ec9201caa9
commit 7141787728

618
atom
View File

@ -1269,294 +1269,243 @@ encodeFile::vorbis() {
} }
worker() { worker() {
debug=2
trap "kill -USR1 $masterpid" EXIT trap "kill -USR1 $masterpid" EXIT
trap - USR1 ALRM PIPE trap - USR1 ALRM PIPE
exec 2>>"$tempdir/errors.log" exec 2>>"$tempdir/worker$1.log"
while : (( debug >= 2 )) && echo "${cmd_arg[@]}" >&2
do "${cmd_arg[@]}" >/dev/null
echo work status=$?
read -t 10 line kill -USR1 $masterpid
until [ -n "$line" ] && [[ $line != AtOM:ComFail ]] exit $status
do
echo work
read -t 10 line
done
if [[ $line == AtOM:Die ]]
then
trap EXIT
break
elif [[ $line == AtOM:Sleep ]]
then
sleep 1
continue
fi
taskid=${line%%|*}
rest="${line#*|}|"
sourcefileid=${rest%%|*}
rest=${rest#*|}
required=${rest%%|*}
rest=${rest#*|}
cmd_arg=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cleanup=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*}
rest=${rest#*|}
destfilename=${rest%%|*}
rest=${rest#*|}
for key in ${!cmd_arg[@]}
do
[ -z "${cmd_arg[key]}" ] && unset cmd_arg[key]
done
(( debug >= 2 )) && echo "${cmd_arg[@]}" >&2
if "${cmd_arg[@]}" >/dev/null
then
echo "finished $taskid|$sourcefileid|$destfileid|$destfilename"
read -t 10 line
until [[ $line == AtOM:OK ]]
do
echo "finished $taskid|$sourcefileid|$destfileid|$destfilename"
read -t 10 line
done
else
echo "failed $taskid"
read -t 10 line
until [[ $line == AtOM:OK ]]
do
echo "failed $taskid"
read -t 10 line
done
[ -n "$filename" ] \
&& eval rm -f $filename
fi
unset cmd_arg
if [ -n "$cleanup" -a -n "$required" ]
then
echo "cleanup $required"
read -t 10 answer
until [[ $answer != AtOM:ComFail ]]
do
echo "cleanup $required"
read -t 10 answer
done
if (( answer == 1 ))
then
eval rm -f $cleanup
fi
fi
done
return
} }
master() { master() {
for workerid in ${!workers[@]} if (( active >= concurrency)) || [ -n "$quit" ]
do then
if read -t0.001 -u$((200+workerid)) workercommand workerquery sleep 0.1
else
echo '
SELECT COUNT(*)
FROM tasks
WHERE status = 0;
SELECT COUNT(*)
FROM tasks
WHERE status = 0
AND requires is NULL;
SELECT
id,
source_file,
required,
cmd_arg0,
cmd_arg1,
cmd_arg2,
cmd_arg3,
cmd_arg4,
cmd_arg5,
cmd_arg6,
cmd_arg7,
cmd_arg8,
cmd_arg9,
cmd_arg10,
cmd_arg11,
cmd_arg12,
cmd_arg13,
cmd_arg14,
cmd_arg15,
cmd_arg16,
cmd_arg17,
cmd_arg18,
cmd_arg19,
cmd_arg20,
cmd_arg21,
cmd_arg22,
cmd_arg23,
cmd_arg24,
cmd_arg25,
cmd_arg26,
cmd_arg27,
cmd_arg28,
cmd_arg29,
cleanup,
fileid,
filename
FROM tasks
WHERE status = 0
AND requires is NULL
ORDER BY source_file
LIMIT 1;
' >&3
read -u4 remaining
read -u4 ready
if (( remaining == 0 ))
then then
break sleep 0.1
continue
elif (( ready == 0 ))
then
sleep 0.1
else
(( ++active ))
read -u4 line
taskid=${line%%|*}
rest="${line#*|}|"
sourcefileid=${rest%%|*}
rest=${rest#*|}
required=${rest%%|*}
rest=${rest#*|}
cmd_arg=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cleanup=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*}
rest=${rest#*|}
destfilename=${rest%%|*}
rest=${rest#*|}
for key in ${!cmd_arg[@]}
do
[ -z "${cmd_arg[key]}" ] && unset cmd_arg[key]
done
workerid=$(getworkerid)
workertasks[workerid]=$taskid
Update tasks status 1 <<<"id = $taskid"
export cmd_arg
createworker $workerid
unset cmd_arg
fi
fi
}
getworkerid() {
local i
for (( i=0 ; i >= 0 ; i++ ))
do
if [ -z "${workers[i]}" ]
then
echo $i
return
fi fi
done done
(( ${#workers[@]} == 0 )) && break # If we reach this, we have reached the signed long limit
if [ -n "$workercommand" ] # (2^63 - 1 = 9223372036854775807 - Got a supercomputer?)
then (( concurrency-- ))
case $workercommand in }
'work')
if [ -n "$quit" ]
then
destroyworker $workerid
elif [ -n "${workertasks[workerid]}" ]
then
echo '
SELECT
id,
source_file,
required,
cmd_arg0,
cmd_arg1,
cmd_arg2,
cmd_arg3,
cmd_arg4,
cmd_arg5,
cmd_arg6,
cmd_arg7,
cmd_arg8,
cmd_arg9,
cmd_arg10,
cmd_arg11,
cmd_arg12,
cmd_arg13,
cmd_arg14,
cmd_arg15,
cmd_arg16,
cmd_arg17,
cmd_arg18,
cmd_arg19,
cmd_arg20,
cmd_arg21,
cmd_arg22,
cmd_arg23,
cmd_arg24,
cmd_arg25,
cmd_arg26,
cmd_arg27,
cmd_arg28,
cmd_arg29,
cleanup,
fileid,
filename
FROM tasks
WHERE
id='${workertasks[workerid]}';
' >&3
read -u4 line
eval echo '"$line" >&'$((100+workerid))
elif (( active < concurrency ))
then
echo '
SELECT COUNT(*)
FROM tasks
WHERE status = 0;
SELECT COUNT(*) createworker() {
FROM tasks worker &
WHERE status = 0 workers[$1]=$!
AND requires is NULL; }
SELECT destroyworker() {
id, dyingworker=${workers[$1]}
source_file, unset workers[$1]
required, wait $dyingworker
cmd_arg0, }
cmd_arg1,
cmd_arg2, checkworkers() {
cmd_arg3, for key in ${!workers[@]}
cmd_arg4, do
cmd_arg5, if ! kill -0 ${workers[key]} 2>/dev/null
cmd_arg6, then
cmd_arg7, if read -u4 -t 0.01 alienquery
cmd_arg8, then
cmd_arg9, alienresults=(${alienquery%%|*})
cmd_arg10, rest="${alienquery#*|}|"
cmd_arg11, while [ -n "$rest" ]
cmd_arg12, do
cmd_arg13, alienresults+=("${rest%%|*}")
cmd_arg14, rest=${rest#*|}
cmd_arg15, done
cmd_arg16, fi
cmd_arg17, echo '
cmd_arg18, SELECT
cmd_arg19, id,
cmd_arg20, source_file,
cmd_arg21, required,
cmd_arg22, cleanup,
cmd_arg23, fileid,
cmd_arg24, filename
cmd_arg25, FROM tasks
cmd_arg26, WHERE
cmd_arg27, id='${workertasks[key]}';
cmd_arg28, ' >&3
cmd_arg29, read -u4 line
cleanup, taskid=${line%%|*}
fileid, rest="${line#*|}|"
filename sourcefileid=${rest%%|*}
FROM tasks rest=${rest#*|}
WHERE status = 0 required=${rest%%|*}
AND requires is NULL rest=${rest#*|}
ORDER BY source_file cleanup=${rest%%|*}
LIMIT 1; rest=${rest#*|}
' >&3 destfileid=${rest%%|*}
read -u4 remaining rest=${rest#*|}
read -u4 ready destfilename=${rest%%|*}
if (( remaining == 0 )) rest=${rest#*|}
then if destroyworker $key
destroyworker $workerid then
continue
elif (( ready == 0 ))
then
line=AtOM:Sleep
else
(( ++active ))
read -u4 line
taskid=${line%%|*}
workertasks[workerid]=$taskid
Update tasks status 1 <<<"id = $taskid"
fi
eval echo '"$line" >&'$((100+workerid))
else
destroyworker $workerid
fi
;;
'finished')
eval 'echo AtOM:OK >&'$((workerid+100))
[ -z "${workertasks[workerid]}" ] && continue
(( ++ran )) (( ++ran ))
unset workertasks[workerid]
(( active-- )) || true (( active-- )) || true
taskid=${workerquery%%|*}
rest="${workerquery#*|}|"
sourcefileid=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*}
rest=${rest#*|}
destfilename=${rest%%|*}
Delete tasks <<<"id = $taskid" Delete tasks <<<"id = $taskid"
if [ -n "$destfilename" ] if [ -n "$destfilename" ]
then then
@ -1581,15 +1530,7 @@ master() {
"WHERE id=$destfileid;" \ "WHERE id=$destfileid;" \
>&3 >&3
fi fi
;; else
'failed')
eval 'echo AtOM:OK >&'$((workerid+100))
[ -z "${workertasks[workerid]}" ] && continue
unset workertasks[workerid]
(( --active )) || true
(( ++failed ))
(( ++ran ))
taskid=$workerquery
faildepends=$( faildepends=$(
Select tasks 'COUNT(*)' <<-EOWhere Select tasks 'COUNT(*)' <<-EOWhere
requires = $taskid requires = $taskid
@ -1598,79 +1539,33 @@ master() {
(( failed+=faildepends )) (( failed+=faildepends ))
Update tasks status 2 <<<"id = $taskid" Update tasks status 2 <<<"id = $taskid"
Update tasks status 2 <<<"requires = $taskid" Update tasks status 2 <<<"requires = $taskid"
;;
'cleanup')
required=$workerquery
echo "SELECT COUNT(*)
FROM tasks
WHERE ( status = 0 OR status = 1 )
AND required = $required;">&3
read -u4 count
if (( count == 0 ))
then
eval echo 1 '>&'$((100+workerid))
else
eval echo 0 '>&'$((100+workerid))
fi
;;
*)
eval 'echo "AtOM:ComFail" >&'$((100+workerid))
;;
esac
fi
}
getworkerid() {
local i
for (( i=0 ; i < 100 ; i++ ))
do
if [ -z "${workers[i]}" ]
then
echo $i
break
fi
done
# If we reach this, we have reached the hardcoded 100 workers limit
(( concurrency-- ))
}
createworker() {
mkfifo "$tempdir"/worker$1{in,out}
worker $1 <"$tempdir"/worker$1in >"$tempdir"/worker$1out &
workers[$1]=$!
eval exec $((100+$1))'>"$tempdir"/worker$1in'
eval exec $((200+$1))'<"$tempdir"/worker$1out'
}
destroyworker() {
dyingworker=${workers[$1]}
unset workers[$1]
[ -z "$2" ] && eval echo AtOM:Die '>&'$((100+$1))
wait $dyingworker
eval $((100+$1))'>&-'
eval $((200+$1))'<&-'
rm "$tempdir"/worker$1{in,out}
}
checkworkers() {
for key in ${!workers[@]}
do
if ! kill -0 ${workers[key]} 2>/dev/null
then
destroyworker $key nokill
if [ -n "${workertasks[key]}" ]
then
faildepends=$(
Select tasks 'COUNT(*)' <<-EOWhere
requires = ${workertasks[key]}
EOWhere
)
(( ++failed ))
(( failed+=faildepends ))
Update tasks status 2 <<<"id = ${workertasks[key]}"
Update tasks status 2 <<<"requires = ${workertasks[key]}"
fi fi
createworker $key echo "SELECT COUNT(*)
FROM tasks
WHERE ( status = 0 OR status = 1 )
AND required = $taskid;">&3
read -u4 count
if (( count == 0 ))
then
rm -f "$cleanup"
fi
if (( ${#alienresults[@]} ))
then
alienquery='SELECT '
for key in ${!alienresults[@]}
do
(( key > 0 )) && alienquery+=,
expr='^[0-9]*$'
if [[ ${alienresults[key]} =~ $expr ]]
then
alienquery+=${alienresults[key]}
else
alienquery+="\"${alienresults[@]//\"/\"\"}\""
fi
done
echo "$alienquery;" >&3
unset alienquery alienresults
fi
fi fi
done done
} }
@ -2119,21 +2014,21 @@ echo -e "\rCreated ${count:-0} tasks for $filecount files (${copies:-0} immediat
masterpid=$$ masterpid=$$
trap checkworkers USR1 ALRM PIPE trap checkworkers USR1 ALRM PIPE
rm -f "$tempdir"/worker*
concurrency=$(( maxload / 2 )) concurrency=$(( maxload / 2 ))
(( concurrency )) || concurrency=1 (( concurrency )) || concurrency=1
active=0 active=0
#set -x
for (( i=0 ; i < concurrency ; i++ )) for (( i=0 ; i < concurrency ; i++ ))
do do
createworker $(getworkerid) master
done done
concurrencychange=$(date +%s) concurrencychange=$(date +%s)
starttime=$concurrencychange starttime=$concurrencychange
taskcount=$count taskcount=$count
failed=0 failed=0
while : while (( ${#workers[@]} ))
do do
if read -n 1 -t 0.01 userinput if read -n 1 -t 1 userinput
then then
case $userinput in case $userinput in
'+') '+')
@ -2160,7 +2055,6 @@ do
then then
concurrencychange=$(date +%s) concurrencychange=$(date +%s)
(( ++concurrency )) (( ++concurrency ))
createworker $(getworkerid)
fi fi
fi fi
master master