Merge branch 'no-pipes'

* no-pipes:
  small fixes
  no SQL inside the signal-handler (race-condition)
  get rid of pipes
This commit is contained in:
Vincent Riquer 2013-04-04 16:51:33 +02:00
commit 44e88068e5

632
atom
View File

@ -1267,387 +1267,281 @@ encodeFile::vorbis() {
} }
worker() { worker() {
trap "kill -USR1 $masterpid" EXIT exec 2>>"$tempdir/worker$1.log"
trap - USR1 ALRM PIPE (( debug >= 2 )) && echo "${cmd_arg[@]}" >&2
exec 2>>"$tempdir/errors.log" "${cmd_arg[@]}" >/dev/null
while :
do
echo work
read -t 10 line
until [ -n "$line" ] && [[ $line != AtOM:ComFail ]]
do
echo work
read -t 10 line
done
if [[ $line == AtOM:Die ]]
then
trap EXIT
break
elif [[ $line == AtOM:Sleep ]]
then
sleep 1
continue
fi
taskid=${line%%|*}
rest="${line#*|}|"
sourcefileid=${rest%%|*}
rest=${rest#*|}
required=${rest%%|*}
rest=${rest#*|}
cmd_arg=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cmd_arg+=("${rest%%|*}")
rest=${rest#*|}
cleanup=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*}
rest=${rest#*|}
destfilename=${rest%%|*}
rest=${rest#*|}
for key in ${!cmd_arg[@]}
do
[ -z "${cmd_arg[key]}" ] && unset cmd_arg[key]
done
(( debug >= 2 )) && echo "${cmd_arg[@]}" >&2
if "${cmd_arg[@]}" >/dev/null
then
echo "finished $taskid|$sourcefileid|$destfileid|$destfilename"
read -t 10 line
until [[ $line == AtOM:OK ]]
do
echo "finished $taskid|$sourcefileid|$destfileid|$destfilename"
read -t 10 line
done
else
echo "failed $taskid"
read -t 10 line
until [[ $line == AtOM:OK ]]
do
echo "failed $taskid"
read -t 10 line
done
[ -n "$filename" ] \
&& eval rm -f $filename
fi
unset cmd_arg
if [ -n "$cleanup" -a -n "$required" ]
then
echo "cleanup $required"
read -t 10 answer
until [[ $answer != AtOM:ComFail ]]
do
echo "cleanup $required"
read -t 10 answer
done
if (( answer == 1 ))
then
eval rm -f $cleanup
fi
fi
done
return
} }
master() { master() {
for workerid in ${!workers[@]} if (( active >= concurrency)) || [ -n "$quit" ]
do
if read -t0.001 -u$((200+workerid)) workercommand workerquery
then
break
fi
done
(( ${#workers[@]} == 0 )) && break
if [ -n "$workercommand" ]
then then
case $workercommand in sleep 0.1
'work') else
if [ -n "$quit" ] echo '
then SELECT COUNT(*)
destroyworker $workerid FROM tasks
elif [ -n "${workertasks[workerid]}" ] WHERE status = 0;
then
echo '
SELECT
id,
source_file,
required,
cmd_arg0,
cmd_arg1,
cmd_arg2,
cmd_arg3,
cmd_arg4,
cmd_arg5,
cmd_arg6,
cmd_arg7,
cmd_arg8,
cmd_arg9,
cmd_arg10,
cmd_arg11,
cmd_arg12,
cmd_arg13,
cmd_arg14,
cmd_arg15,
cmd_arg16,
cmd_arg17,
cmd_arg18,
cmd_arg19,
cmd_arg20,
cmd_arg21,
cmd_arg22,
cmd_arg23,
cmd_arg24,
cmd_arg25,
cmd_arg26,
cmd_arg27,
cmd_arg28,
cmd_arg29,
cleanup,
fileid,
filename
FROM tasks
WHERE
id='${workertasks[workerid]}';
' >&3
read -u4 line
eval echo '"$line" >&'$((100+workerid))
elif (( active < concurrency ))
then
echo '
SELECT COUNT(*)
FROM tasks
WHERE status = 0;
SELECT COUNT(*) SELECT COUNT(*)
FROM tasks FROM tasks
WHERE status = 0 WHERE status = 0
AND requires is NULL; AND requires is NULL;
SELECT SELECT
id, id,
source_file, source_file,
required, required,
cmd_arg0, cmd_arg0,
cmd_arg1, cmd_arg1,
cmd_arg2, cmd_arg2,
cmd_arg3, cmd_arg3,
cmd_arg4, cmd_arg4,
cmd_arg5, cmd_arg5,
cmd_arg6, cmd_arg6,
cmd_arg7, cmd_arg7,
cmd_arg8, cmd_arg8,
cmd_arg9, cmd_arg9,
cmd_arg10, cmd_arg10,
cmd_arg11, cmd_arg11,
cmd_arg12, cmd_arg12,
cmd_arg13, cmd_arg13,
cmd_arg14, cmd_arg14,
cmd_arg15, cmd_arg15,
cmd_arg16, cmd_arg16,
cmd_arg17, cmd_arg17,
cmd_arg18, cmd_arg18,
cmd_arg19, cmd_arg19,
cmd_arg20, cmd_arg20,
cmd_arg21, cmd_arg21,
cmd_arg22, cmd_arg22,
cmd_arg23, cmd_arg23,
cmd_arg24, cmd_arg24,
cmd_arg25, cmd_arg25,
cmd_arg26, cmd_arg26,
cmd_arg27, cmd_arg27,
cmd_arg28, cmd_arg28,
cmd_arg29, cmd_arg29,
cleanup, cleanup,
fileid, fileid,
filename filename
FROM tasks FROM tasks
WHERE status = 0 WHERE status = 0
AND requires is NULL AND requires is NULL
ORDER BY source_file ORDER BY source_file
LIMIT 1; LIMIT 1;
' >&3 ' >&3
read -u4 remaining read -u4 remaining
read -u4 ready read -u4 ready
if (( remaining == 0 )) if (( remaining == 0 ))
then then
destroyworker $workerid sleep 0.1
continue continue
elif (( ready == 0 )) elif (( ready == 0 ))
then then
line=AtOM:Sleep sleep 0.1
else else
(( ++active )) (( ++active ))
read -u4 line read -u4 line
taskid=${line%%|*} taskid=${line%%|*}
workertasks[workerid]=$taskid rest="${line#*|}|"
Update tasks status 1 <<<"id = $taskid" sourcefileid=${rest%%|*}
fi rest=${rest#*|}
eval echo '"$line" >&'$((100+workerid)) required=${rest%%|*}
else rest=${rest#*|}
destroyworker $workerid cmd_arg=("${rest%%|*}")
fi rest=${rest#*|}
;; cmd_arg+=("${rest%%|*}")
'finished') rest=${rest#*|}
eval 'echo AtOM:OK >&'$((workerid+100)) cmd_arg+=("${rest%%|*}")
[ -z "${workertasks[workerid]}" ] && continue rest=${rest#*|}
(( ++ran )) cmd_arg+=("${rest%%|*}")
unset workertasks[workerid] rest=${rest#*|}
(( active-- )) || true cmd_arg+=("${rest%%|*}")
taskid=${workerquery%%|*} rest=${rest#*|}
rest="${workerquery#*|}|" cmd_arg+=("${rest%%|*}")
sourcefileid=${rest%%|*} rest=${rest#*|}
rest=${rest#*|} cmd_arg+=("${rest%%|*}")
destfileid=${rest%%|*} rest=${rest#*|}
rest=${rest#*|} cmd_arg+=("${rest%%|*}")
destfilename=${rest%%|*} rest=${rest#*|}
Delete tasks <<<"id = $taskid" cmd_arg+=("${rest%%|*}")
if [ -n "$destfilename" ] rest=${rest#*|}
then cmd_arg+=("${rest%%|*}")
echo \ rest=${rest#*|}
"UPDATE destination_files" \ cmd_arg+=("${rest%%|*}")
"SET filename=\"${destfilename//\"/\"\"}\"," \ rest=${rest#*|}
" last_change=(" \ cmd_arg+=("${rest%%|*}")
" SELECT last_change" \ rest=${rest#*|}
" FROM source_files" \ cmd_arg+=("${rest%%|*}")
" WHERE id=$sourcefileid" \ rest=${rest#*|}
" )," \ cmd_arg+=("${rest%%|*}")
" old_filename=(" \ rest=${rest#*|}
" SELECT filename" \ cmd_arg+=("${rest%%|*}")
" FROM destination_files" \ rest=${rest#*|}
" WHERE id=$destfileid" \ cmd_arg+=("${rest%%|*}")
" )," \ rest=${rest#*|}
" rename_pattern=(" \ cmd_arg+=("${rest%%|*}")
" SELECT rename_pattern" \ rest=${rest#*|}
" FROM tasks" \ cmd_arg+=("${rest%%|*}")
" WHERE id=$taskid" \ rest=${rest#*|}
" )" \ cmd_arg+=("${rest%%|*}")
"WHERE id=$destfileid;" \ rest=${rest#*|}
>&3 cmd_arg+=("${rest%%|*}")
fi rest=${rest#*|}
;; cmd_arg+=("${rest%%|*}")
'failed') rest=${rest#*|}
eval 'echo AtOM:OK >&'$((workerid+100)) cmd_arg+=("${rest%%|*}")
[ -z "${workertasks[workerid]}" ] && continue rest=${rest#*|}
unset workertasks[workerid] cmd_arg+=("${rest%%|*}")
(( --active )) || true rest=${rest#*|}
(( ++failed )) cmd_arg+=("${rest%%|*}")
(( ++ran )) rest=${rest#*|}
taskid=$workerquery cmd_arg+=("${rest%%|*}")
faildepends=$( rest=${rest#*|}
Select tasks 'COUNT(*)' <<-EOWhere cmd_arg+=("${rest%%|*}")
requires = $taskid rest=${rest#*|}
EOWhere cmd_arg+=("${rest%%|*}")
) rest=${rest#*|}
(( failed+=faildepends )) cmd_arg+=("${rest%%|*}")
Update tasks status 2 <<<"id = $taskid" rest=${rest#*|}
Update tasks status 2 <<<"requires = $taskid" cmd_arg+=("${rest%%|*}")
;; rest=${rest#*|}
'cleanup') cmd_arg+=("${rest%%|*}")
required=$workerquery rest=${rest#*|}
echo "SELECT COUNT(*) cleanup=${rest%%|*}
FROM tasks rest=${rest#*|}
WHERE ( status = 0 OR status = 1 ) destfileid=${rest%%|*}
AND required = $required;">&3 rest=${rest#*|}
read -u4 count destfilename=${rest%%|*}
if (( count == 0 )) rest=${rest#*|}
then for key in ${!cmd_arg[@]}
eval echo 1 '>&'$((100+workerid)) do
else [ -z "${cmd_arg[key]}" ] && unset cmd_arg[key]
eval echo 0 '>&'$((100+workerid)) done
fi workerid=$(getworkerid)
;; workertasks[workerid]=$taskid
*) Update tasks status 1 <<<"id = $taskid"
eval 'echo "AtOM:ComFail" >&'$((100+workerid)) createworker $workerid
;; fi
esac
fi fi
} }
getworkerid() { getworkerid() {
local i local i
for (( i=0 ; i < 100 ; i++ )) for (( i=0 ; i >= 0 ; i++ ))
do do
if [ -z "${workers[i]}" ] if [ -z "${workers[i]}" ]
then then
echo $i echo $i
break return
fi fi
done done
# If we reach this, we have reached the hardcoded 100 workers limit # If we reach this, we have reached the signed long limit
# (2^63 - 1 = 9223372036854775807 - Got a supercomputer?)
(( concurrency-- )) (( concurrency-- ))
} }
createworker() { createworker() {
mkfifo "$tempdir"/worker$1{in,out} worker $1 &
worker $1 <"$tempdir"/worker$1in >"$tempdir"/worker$1out &
workers[$1]=$! workers[$1]=$!
eval exec $((100+$1))'>"$tempdir"/worker$1in'
eval exec $((200+$1))'<"$tempdir"/worker$1out'
} }
destroyworker() { destroyworker() {
dyingworker=${workers[$1]} dyingworker=${workers[$1]}
unset workers[$1] unset workers[$1]
[ -z "$2" ] && eval echo AtOM:Die '>&'$((100+$1))
wait $dyingworker wait $dyingworker
eval $((100+$1))'>&-' }
eval $((200+$1))'<&-'
rm "$tempdir"/worker$1{in,out} gettaskinfos() {
echo '
SELECT
id,
source_file,
required,
cleanup,
fileid,
filename
FROM tasks
WHERE id='$1';
' >&3
read -u4 line
taskid=${line%%|*}
rest="${line#*|}|"
sourcefileid=${rest%%|*}
rest=${rest#*|}
required=${rest%%|*}
rest=${rest#*|}
cleanup=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*}
rest=${rest#*|}
destfilename=${rest%%|*}
rest=${rest#*|}
}
cleaner() {
for key in ${!failedtasks[@]}
do
taskid=${failedtasks[key]}
gettaskinfos $taskid
faildepends=$(
Select tasks 'COUNT(*)' <<-EOWhere
requires = $taskid
EOWhere
)
(( failed+=faildepends ))
Update tasks status 2 <<<"id = $taskid"
Update tasks status 2 <<<"requires = $taskid"
echo "SELECT COUNT(*)
FROM tasks
WHERE ( status = 0 OR status = 1 )
AND required = $taskid;">&3
read -u4 count
if (( count == 0 ))
then
rm -f "$cleanup"
fi
unset failedtasks[key]
done
for key in ${!finishedtasks[@]}
do
taskid=${finishedtasks[key]}
gettaskinfos $taskid
if [ -n "$destfilename" ]
then
echo \
"UPDATE destination_files" \
"SET filename=\"${destfilename//\"/\"\"}\"," \
" last_change=(" \
" SELECT last_change" \
" FROM source_files" \
" WHERE id=$sourcefileid" \
" )," \
" old_filename=(" \
" SELECT filename" \
" FROM destination_files" \
" WHERE id=$destfileid" \
" )," \
" rename_pattern=(" \
" SELECT rename_pattern" \
" FROM tasks" \
" WHERE id=$taskid" \
" )" \
"WHERE id=$destfileid;" \
>&3
fi
echo "SELECT COUNT(*)
FROM tasks
WHERE ( status = 0 OR status = 1 )
AND required = $taskid;">&3
read -u4 count
if (( count == 0 ))
then
rm -f "$cleanup"
fi
Delete tasks <<<"id = $taskid"
unset finishedtasks[key]
done
} }
checkworkers() { checkworkers() {
@ -1655,20 +1549,17 @@ checkworkers() {
do do
if ! kill -0 ${workers[key]} 2>/dev/null if ! kill -0 ${workers[key]} 2>/dev/null
then then
destroyworker $key nokill taskid=${workertasks[key]}
if [ -n "${workertasks[key]}" ] (( ++ran ))
(( active-- ))
if destroyworker $key
then then
faildepends=$( finishedtasks+=($taskid)
Select tasks 'COUNT(*)' <<-EOWhere else
requires = ${workertasks[key]} failedtasks+=($taskid)
EOWhere
)
(( ++failed )) (( ++failed ))
(( failed+=faildepends ))
Update tasks status 2 <<<"id = ${workertasks[key]}"
Update tasks status 2 <<<"requires = ${workertasks[key]}"
fi fi
createworker $key unset workertasks[key]
fi fi
done done
} }
@ -2115,23 +2006,21 @@ done
echo 'COMMIT;' >&3 echo 'COMMIT;' >&3
echo -e "\rCreated ${count:-0} tasks for $filecount files (${copies:-0} immediate copies)" echo -e "\rCreated ${count:-0} tasks for $filecount files (${copies:-0} immediate copies)"
masterpid=$$
trap checkworkers USR1 ALRM PIPE
rm -f "$tempdir"/worker*
concurrency=$(( maxload / 2 )) concurrency=$(( maxload / 2 ))
(( concurrency )) || concurrency=1 (( concurrency )) || concurrency=1
active=0 active=0
#set -x
for (( i=0 ; i < concurrency ; i++ )) for (( i=0 ; i < concurrency ; i++ ))
do do
createworker $(getworkerid) master
done done
concurrencychange=$(date +%s) concurrencychange=$(date +%s)
starttime=$concurrencychange starttime=$concurrencychange
taskcount=$count taskcount=$count
failed=0 failed=0
while : while (( ${#workers[@]} ))
do do
if read -n 1 -t 0.01 userinput if read -n 1 -t 0.1 userinput
then then
case $userinput in case $userinput in
'+') '+')
@ -2158,9 +2047,10 @@ do
then then
concurrencychange=$(date +%s) concurrencychange=$(date +%s)
(( ++concurrency )) (( ++concurrency ))
createworker $(getworkerid)
fi fi
fi fi
checkworkers
cleaner
master master
if ((taskcount - remaining)) if ((taskcount - remaining))
then then