multi-process

This commit is contained in:
Vincent Riquer 2013-03-18 17:02:53 +01:00
parent 7d57f59dd5
commit 76a858a40b

309
atom
View File

@ -1204,100 +1204,185 @@ transcodeLauncher() {
} }
worker() { worker() {
set +e
while : while :
do do
if [ -f "$tempdir/worker-lock" ] echo work
read line
if [[ $line == AtOM:Die ]]
then then
sleep 0.001 break
elif [[ $line == AtOM:Sleep ]]
then
sleep 1
continue continue
fi
taskid=${line%%|*}
rest="${line#*|}|"
sourcefileid=${rest%%|*}
rest=${rest#*|}
required=${rest%%|*}
rest=${rest#*|}
commandline=${rest%%|*}
rest=${rest#*|}
cleanup=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*}
rest=${rest#*|}
destfilename=${rest%%|*}
rest=${rest#*|}
if eval $commandline >/dev/null 2>>"$tempdir/errors.log"
then
echo "finished $taskid|$sourcefileid|$destfileid|$destfilename"
else else
touch "$tempdir/worker-lock" echo "failed $taskid"
concurrency=$(Select worker_comm value <<<"parameter = concurrency") [ -n "$filename" ] \
active_workers=$(Select tasks 'count(*)' <<<"status = 1") && eval rm -f $filename
if (( active_workers < concurrency )) fi
if [ -n "$cleanup" -a -n "$required" ]
then
echo "cleanup $required"
read answer
if (( answer == 1 ))
then then
echo ' eval rm -f $cleanup
SELECT COUNT(*) fi
FROM tasks fi
WHERE status = 0 done
AND requires is NULL; return
}
SELECT master() {
id, for workerid in ${!workers[@]}
source_file, do
required, if read -t0.001 -u$((200+workerid)) workercommand workerquery
command_line, then
cleanup, break
fileid, fi
filename done
FROM tasks (( ${#workers[@]} == 0 )) && break
WHERE status = 0 if [ -n "$workercommand" ]
AND requires is NULL then
ORDER BY source_file case $workercommand in
LIMIT 1; 'work')
' >&3 if [ -n "$quit" ]
read -u4 count
if (( count == 0 ))
then then
rm "$tempdir/worker-lock" eval echo AtOM:Die '>&'$((100+workerid))
exit wait ${workers[workerid]}
break
eval $((100+workerid))'>&-'
eval $((200+workerid))'<&-'
rm "$tempdir"/worker${workerid}{in,out}
unset workers[workerid]
elif (( active < concurrency ))
then
echo '
SELECT COUNT(*)
FROM tasks
WHERE status = 0;
SELECT COUNT(*)
FROM tasks
WHERE status = 0
AND requires is NULL;
SELECT
id,
source_file,
required,
command_line,
cleanup,
fileid,
filename
FROM tasks
WHERE status = 0
AND requires is NULL
ORDER BY source_file
LIMIT 1;
' >&3
read -u4 remaining
read -u4 ready
if (( remaining == 0 ))
then
eval echo AtOM:Die '>&'$((100+workerid))
wait ${workers[workerid]}
eval $((100+workerid))'>&-'
eval $((200+workerid))'<&-'
rm "$tempdir"/worker${workerid}{in,out}
unset workers[workerid]
continue
elif (( ready == 0 ))
then
line=AtOM:Sleep
else
(( ++active ))
read -u4 line
taskid=${line%%|*}
Update tasks status 1 <<<"id = $taskid"
fi
eval echo '"$line" >&'$((100+workerid))
else
eval echo AtOM:Die '>&'$((100+workerid))
wait ${workers[workerid]}
eval $((100+workerid))'>&-'
eval $((200+workerid))'<&-'
rm "$tempdir"/worker${workerid}{in,out}
unset workers[workerid]
fi fi
read -u4 line ;;
taskid=${line%%|*} 'finished')
rest="${line#*|}|" (( active-- )) || true
taskid=${workerquery%%|*}
rest="${workerquery#*|}|"
sourcefileid=${rest%%|*} sourcefileid=${rest%%|*}
rest=${rest#*|} rest=${rest#*|}
required=${rest%%|*}
rest=${rest#*|}
commandline=${rest%%|*}
rest=${rest#*|}
cleanup=${rest%%|*}
rest=${rest#*|}
destfileid=${rest%%|*} destfileid=${rest%%|*}
rest=${rest#*|} rest=${rest#*|}
destfilename=${rest%%|*} destfilename=${rest%%|*}
rest=${rest#*|} Delete tasks <<<"id = $taskid"
Update tasks status 1 <<<"id = $taskid" if [ -n "$destfilename" ]
rm "$tempdir/worker-lock"
if eval $commandline 2>>"$tempdir/errors.log"
then then
Delete tasks <<<"id = $taskid" echo \
if [ -n "$destfilename" ] "UPDATE destination_files" \
then "SET filename=\"$destfilename\"," \
echo \ " last_change=(" \
"UPDATE destination_files" \ " SELECT last_change" \
"SET filename=\"$destfilename\"," \ " FROM source_files" \
" last_change=(" \ " WHERE id=$sourcefileid" \
" SELECT last_change" \ " )" \
" FROM source_files" \ "WHERE id=$destfileid;" \
" WHERE id=$sourcefileid"\ >&3
" )" \ fi
"WHERE id=$destfileid;" \ ;;
>&3 'failed')
fi (( --active )) || true
(( ++failed ))
taskid=$workerquery
faildepends=$(
Select tasks 'COUNT(*)' <<-EOWhere
requires = taskid
EOWhere
)
(( failed+=faildepends ))
Update tasks status 2 <<<"id = $taskid"
Update tasks status 2 <<<"requires = $taskid"
;;
'cleanup')
required=$workerquery
echo "SELECT COUNT(*)
FROM tasks
WHERE ( status = 0 OR status = 1 )
AND required = $required;">&3
read -u4 count
if (( count == 0 ))
then
eval echo 1 '>&'$((100+workerid))
else else
Update tasks status 2 <<<"id = $taskid" eval echo 0 '>&'$((100+workerid))
[ -n "$filename" ] && rm -f "$filename"
[ -n "$cleanup" ] && rm -f "$cleanup"
fi fi
if [ -n "$cleanup" -a -n "$required" ] ;;
then esac
echo "SELECT COUNT(*) fi
FROM tasks
WHERE ( status = 0 OR status = 1 )
AND required = $required;">&3
read -u4 count
if (( count == 0 ))
then
eval rm $cleanup
fi
fi
else
rm "$tempdir/worker-lock"
exit
fi
fi
done
} }
#UI #UI
@ -1491,12 +1576,6 @@ echo '
); );
CREATE INDEX tasks_by_key ON tasks ( key ); CREATE INDEX tasks_by_key ON tasks ( key );
CREATE INDEX tasks_by_sourcefile ON tasks ( source_file ); CREATE INDEX tasks_by_sourcefile ON tasks ( source_file );
CREATE TEMPORARY TABLE worker_comm(
parameter TEXT UNIQUE NOT NULL,
value TEXT
);
CREATE INDEX wrkcomm_by_param ON worker_comm ( parameter );
' >&3 ' >&3
echo ' echo '
@ -1686,10 +1765,68 @@ do
done done
echo 'COMMIT;' >&3 echo 'COMMIT;' >&3
echo -e "\rCreated ${count:-0} tasks for $filecount files" echo -e "\rCreated ${count:-0} tasks for $filecount files"
unset count
InsertOrUpdate worker_comm value 1 <<<"parameter concurrency" rm -f "$tempdir"/worker*
worker concurrency=$(( maxload / 2 ))
active=0
for (( i=0 ; i < concurrency ; i++ ))
do
(( ++wnum ))
mkfifo "$tempdir"/worker${wnum}{in,out}
worker $wnum <"$tempdir"/worker${wnum}in >"$tempdir"/worker${wnum}out &
workers[wnum]=$!
eval exec $((100+wnum))'>"$tempdir"/worker${wnum}in'
eval exec $((200+wnum))'<"$tempdir"/worker${wnum}out'
done
concurrencychange=$(date +%s)
taskcount=$count
while :
do
if read -n 1 -t 0.01 userinput
then
case $userinput in
'+')
((maxload++))
;;
'-')
((--maxload)) || ((maxload=1))
;;
[qQ])
quit=1
;;
esac
fi
read humanload garbage < /proc/loadavg
load=${humanload%.*}
if [ -z "$quit" ] && (( $(date +%s)-concurrencychange >= loadinterval ))
then
if (( concurrency > 1 )) \
&& (( load > maxload ))
then
concurrencychange=$(date +%s)
(( --concurrency ))
elif (( load < maxload ))
then
concurrencychange=$(date +%s)
(( ++concurrency ))
(( ++wnum ))
mkfifo "$tempdir"/worker${wnum}{in,out}
worker $wnum \
<"$tempdir"/worker${wnum}in \
>"$tempdir"/worker${wnum}out &
workers[wnum]=$!
eval exec $((100+wnum))'>"$tempdir"/worker${wnum}in'
eval exec $((200+wnum))'<"$tempdir"/worker${wnum}out'
fi
fi
master
echo -en "\rload: $humanload / $maxload" \
" workers: $active / $concurrency" \
" done: $(( (taskcount - remaining ) * 100 / taskcount ))%" \
" $((taskcount - remaining)) of $taskcount ($failed failed)"
done
echo
closeDatabase closeDatabase