tech.guitarrapc.cóm

Technical updates

PowerShell による同期処理、非同期処理、並列処理 を考えてみる

PowerShell の一番つらいところは、非同期な Cmdlet やキーワード*1が用意されていないことです。

そこで前回の記事 で作成したコードを使って、同期、非同期、並列の3つのパターンに関して見てみましょう。

拙作のPowerShellによる Deployライブラリvalentia でも大枠は同様に同期、非同期、並列に処理を行っています。

目次

GitHub

サンプルリポジトリは前回同様に以下です。

GitHub - guitarrapc / PS-WaybackMachineAvailavility

同期処理

まずはコードから見てみましょう。

function Get-WaybackMachineAvailavility
{
    [CmdletBinding()]
    Param
    (
        # Input an uri you want to search.
        [Parameter(
            Mandatory = 1,
            ValueFromPipeline,
            ValueFromPipelineByPropertyName,
            Position=0)]
        [string[]]
        $urls,


        # Input timestamp to obtain closed date you want. Make sure as format 'yyyyMMddHHmmss' or 'yyyy' or 'yyyyMM' or 'yyyyMMdd' or else.('2006' will tring to obtain closed to 2006)
        [Parameter(
            Mandatory = 0,
            Position=1)]
        [string]
        $timestamp,

        # Invoke request with async
        [switch]
        $async
    )

    Begin
    {
        # base settings for query
        $private:baseUri = "http://archive.org/wayback/available"
        $private:baseQuery = "?url="
        $private:timestampQuery = "×tamp="
    }
    Process
    {
        foreach($url in $urls)
        {
            # build query
            $private:query = "$baseQuery{0}" -f $url | where {$_}

                # validate timestamp parameter for query
                if (-not [string]::IsNullOrWhiteSpace($PSBoundParameters.timestamp))
                {
                    $private:trimTimestampQuery = $PSBoundParameters.timestamp | where {$_}
                    $private:query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
                }

            # build query uri
            $private:queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

            # invoke request
            Write-Verbose ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
            Write-Verbose ("Whole query string '{0}'" -f $queryUri)

            # using Invoke-RestMethod
            $private:task = Invoke-RestMethod -Method Get -Uri $queryUri -UserAgent ("PowerShell {0}" -f $PSVersionTable.PSVersion)

            # get reuslt
            $private:result =  $task.archived_snapshots.closest

            # create sorted hashtable to create object
            $obj = [ordered]@{
                available = $result.available
                status = $result.status
                timestamp = $result.timestamp
                url = $result.url
                queryInformation = @{
                    url = $url
                    queryUri = $queryUri
                }
            }

            # create PSObject to output
            $output = New-Object -TypeName PSObject -Property $obj
            $output
        }
    }
}

解説

Begin{} Process{} End{} について

まず目につくのが、Begin{} Process{}です。

これらは、 Param()に記述している$urls のパラメータValueFromPipelineValueFromPipelineByPropertyName を処理するために必要です。

ValueFromPipelineValueFromPipelineByPropertyNameは、対象のパラメータを パイプラインから受けることが可能であることを宣言しています。

  • ValueFromPipeline : パイプラインからの入力を自分に割り当てるモノです
  • ValueFromPipelineByPropertyName : パイプラインからの入力でプロパティ名が合致したものを自分に割り当てるモノです

パイプライン越しの入力とは以下のような入力を指します。

# Synchronous pipeline invokation
"http://tech.guitarrapc.com","http://neue.cc" | Get-WaybackMachineAvailavility

この パイプラインからの入力で、繰り返し処理を行うために利用するのが、Begin{} Process{} End{} です。

  • Begin句の処理は 初回に一度だけ実行されます
  • 一方で、Process句の処理は パイプラインのたびに実行されます
  • そしてここでは使っていませんが、End{}は、全てのProcess{}句の完了後に最後に1回だけ実行されます

つまり、 Begin{} Process{} End{} を用いることで、 パイプラインの入力に関しては自動的に 繰り返し実行されます。

Begin{} 処理の内容

ここでは、 クエリの基本となる変数を定めています。

Process{}処理の内容

foreach の利用

foreach を利用しています。

これは、 Cmdlet に urlsパラメータを直接した場合、Begin{} Process{} End{}では繰り返し処理されないためです。

つまり以下の指定にした場合、 Begin{} Process{} End{}では初めの1アドレスしか処理されません。

Get-WaybackMachineAvailavility -urls "http://tech.guitarrapc.com","http://neue.cc"

そのため foreach を使って urlsパラメータに直接複数のアドレスを指定しても処理ができるようにしています。

クエリの生成

残りは、 クエリの生成とInvoke-RestMethod でのJSON取得です。

余り使っているのを見かけないのですが、 PowerShell で URI を生成するときは、以下のワンライナーが便利です。

(@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

こうすることで、パーツに指定した $baseUri$queryが空白だった場合でも正常に URIが生成できます。

Invoke-RestMethod

Invoke-RestMethod は、 Invoke-WebRequest と違い、返却されたJSONを解釈するように内部でごちゃごちゃ処理してくれています。*2

PowerShell 3.0 では、対象のJSONによってはヘッダが途中までしか読めないなど RSS リーダとして利用するには致命的なバグがありました。

が、 PowerShell 4.0 でバグが修正され安心して利用できるようになっています。

後は、JSONが格納されたプロパティを指定してあげるだけです。

カスタムオブジェクト合成

最後にカスタムオブジェクトを生成しています。

簡単ですね。要領はこのやり方で残りの処理を見てみましょう。

非同期処理

メインテーマの非同期処理です。

最近になって、 Hey, Scripting Guy! Blog - Weekend Scripter: Max Out PowerShell in a Little Bit of Time—Part 2 でも、runspace を利用した 非同期処理に関して紹介されるようになりました。

拙作のvalentia でも、同様の手法を採用しています。

では、まずは非同期でのコードを見てみましょう。

function Get-WaybackMachineAvailavilityAsync
{
    [CmdletBinding()]
    Param
    (
        # Input an uri you want to search.
        [Parameter(
            Mandatory = 1,
            Position=0)]
        [string[]]
        $urls,


        # Input timestamp to obtain closed date you want. Make sure as format 'yyyyMMddHHmmss' or 'yyyy' or 'yyyyMM' or 'yyyyMMdd' or else.('2006' will tring to obtain closed to 2006)
        [Parameter(
            Mandatory = 0,
            Position=1)]
        [string]
        $timestamp
    )

    try
    {
        # create Runspace
        Write-Debug ("creating runspace for powershell")
        $private:sessionstate = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
        $private:minPoolSize = $maxPoolSize = 50 # 50 runspaces
        $private:runspacePool = [runspacefactory]::CreateRunspacePool($minPoolSize, $maxPoolSize,  $sessionstate, $Host) # create Runspace Pool
        $runspacePool.ApartmentState = "STA" # only STA mode supports
        $runspacePool.Open() # open pool


        # start process
        foreach ($url in $urls)
        {
            Write-Debug ("start creating command for '{0}'" -f $url)
            $command = {
                [CmdletBinding()]
                param
                (
                    [parameter(
                        mandatory,
                        position = 0)]
                    [string]
                    $url,

                    [parameter(
                        mandatory = 0,
                        position = 1)]
                    [int]
                    $timestamp,

                    [parameter(
                        mandatory = 0,
                        position = 2)]
                    [string]
                    $VerbosePreference
                )

                # change ErrorActionPreference
                Write-Debug "set continue with error as http client requires dispose when method done."
                $private:originalErrorActionPreference = $ErrorActionPreference
                $ErrorActionPreference = "Continue"
                
                # base settings for query
                $private:baseUri = "http://archive.org/wayback/available"
                $private:baseQuery = "?url="
                $private:timestampQuery = "×tamp="

                # build query
                $private:query = "{0}{1}" -f $baseQuery, $url | where {$_}

                    # validate timestamp parameter for query
                    if (-not [string]::IsNullOrWhiteSpace($timestamp))
                    {
                        $private:trimTimestampQuery = $timestamp | where {$_}
                        $private:query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
                    }

                # build query uri
                $private:queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

                # Load Assembly to use HttpClient
                try
                {
                    Add-Type -AssemblyName System.Net.Http
                }
                catch
                {
                }

                # new HttpClient
                $httpClient = New-Object -TypeName System.Net.Http.HttpClient
                $httpClient.BaseAddress = $private:baseUri

                # invoke http client request
                Write-Verbose ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
                Write-Verbose ("Whole query string '{0}'" -f $queryUri)
                $private:task = $httpClient.GetStringAsync($queryUri)
                $task.wait()
                
                # return result
                $task

                # dispose HttpClient
                $httpClient.Dispose()

                # reverse ErrorActionPreference
                $ErrorActionPreference = $originalErrorActionPreference
            }

            # Verbose settings for Async Command inside
            Write-Debug "set VerbosePreference inside Asynchronous execution"
            if ($PSBoundParameters.Verbose.IsPresent)
            {
                $private:verbose = "continue"
            }
            else
            {
                $private:verbose = $VerbosePreference
            }

            # Main Invokation
            Write-Debug "start asynchronous invokation"
            $private:powershell = [PowerShell]::Create().AddScript($command).AddArgument($url).AddArgument($timestamp).AddArgument($verbose)
            $powershell.RunspacePool = $runspacePool
            [array]$private:RunspaceCollection += New-Object -TypeName PSObject -Property @{
                Runspace = $powershell.BeginInvoke();
                powershell = $powershell
            }
        }


        # check process result
        Write-Debug "check asynchronos execution has done"
        while (($runspaceCollection.RunSpace | sort IsCompleted -Unique).IsCompleted -ne $true)
        {
            sleep -Milliseconds 5
        }

        # get process result and end powershell session
        Write-Debug "obtain process result"
        foreach ($runspace in $runspaceCollection)
        {
            # obtain Asynchronos command result
            $private:task = $runspace.powershell.EndInvoke($runspace.Runspace)

            # show result
            if ($task.IsCompleted)
            {
                # get reuslt
                $private:result = ($task.Result | ConvertFrom-Json).archived_snapshots.closest
                # create sorted hashtable to create object
                $private:obj = [ordered]@{
                    available = $result.available
                    status = $result.status
                    timestamp = $result.timestamp
                    url = $result.url
                    queryInformation = @{
                        url = $url
                        queryUri = $queryUri
                    }
                }

                # create PSObject to output
                $private:output = New-Object -TypeName PSObject -Property $obj

                # return result into host
                $output
            }

            # Dispose pipeline
            $runspace.powershell.Dispose()
        }
    }
    finally
    {
        # Dispose Runspace
        $runspacePool.Dispose()
    }
}

解説

本題の非同期処理について説明します。

非同期処理全体の流れは次の通りです。

  1. RunspacePoolの生成

  2. RunspacePoolのオープン

  3. 非同期に行うターゲットごとに処理開始

  4. 非同期に行うターゲットごとに実行するコマンドを生成

  5. 非同期に行うターゲットごとにPowerShellインスタンスを生成

  6. 非同期に行うターゲットごとに生成したPowerShellインスタンスにコマンドや引数を渡す

  7. 非同期に行うターゲットごとに生成したPowerShellインスタンスにRunspaceを割り当てる

  8. 非同期に行うターゲットごとに生成したPowerShellインスタンスを実行

  9. 非同期に実行したコマンドの状態を監視

  10. PowerShellインスタンスごとにコマンド実行結果を取得

  11. 生成したPowerShellインスタンスとRunspaceのクリーンアップ

Begin{} Process{} End{} について

パイプラインからのストリーム入力は 1本です。そのためPipeline を使って非同期に処理することはできません。

これが、非同期処理でBegin{} Process{} End{}を用いた パイプライン入力をサポートしない理由です。

非同期のポイントは RunspacePoolの生成

    try
    {
        # create Runspace
        Write-Debug ("creating runspace for powershell")
        $private:sessionstate = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
        $private:minPoolSize = $maxPoolSize = 50 # 50 runspaces
        $private:runspacePool = [runspacefactory]::CreateRunspacePool($minPoolSize, $maxPoolSize,  $sessionstate, $Host) # create Runspace Pool
        $runspacePool.ApartmentState = "STA" # only STA mode supports

PowerShell での 非同期処理のポイントは[RunspaceFactory] を生成してマルチスレッドを実現することにあります。

単純にRunspaceを生成するだけなら、以下で行えます。

$RunspacePool = [RunspaceFactory]::CreateRunspacePool()

が、SessionStatePoolSizeを指定することでRunspacePoolサイズのハンドルが可能です。

CreateRunspacePool Methodについて、詳しくは Developer Networkをどうぞ。

Developer Network - RunspaceFactory.CreateRunspacePool Method.ASPX)

また、ここで生成したRunspacePoolは処理の成功如何にかかわりなく、破棄が必要なため、try{}finally{}としています。

RunspacePoolのオープン

        $runspacePool.Open() # open pool

必要なサイズの Runspaceを生成したら、.Open()します。

コラム : Job は非同期ではない

時々、「Job を使った非同期?」という謎の記事を見ることがありますが、これには日頃疑念を抱いています。

PowerShell 2.0 において、Jobの考えや手法が導入されましたが、これを利用して非同期処理を行うことは困難を極めます。なぜならStart-Job-AsJobが行っているのは、バックグランドに別のRunspaceを1つ生成して、バックグランド処理を行っているだけです。

つまり、Jobはマルチスレッドでないため、表面上ホスト処理を止めないだけで処理自体の高速化はされません。非同期でないことは計測すれば一目両全です。

もちろん、使い方次第で、通常の処理をバックグランドに回すことでロスをなくし高速化を果たすことは可能です。Jobの使いどころはそれほど多くありませんが、ハマルと気軽に使えて便利です。

foreach で 各urlを処理する
        # start process
        foreach ($url in $urls)
        {

RunSpaceに入れた処理は、バックグラウンドで行われます。

そのため、foreach で順次実行してもそれほどのロスにはなりません。*3

Runspaceで実行する Command の生成

Runspace には、PowerShellインスタンスへの.AddScript()メソッドで処理を差し込みます。

この.AddScript()メソッドは、ScriptBlock を受けれるので、実行したいScriptBlockを生成しましょう。

クエリの生成
            Write-Debug ("start creating command for '{0}'" -f $url)
            $command = {
                [CmdletBinding()]
                param
                (
                    [parameter(
                        mandatory,
                        position = 0)]
                    [string]
                    $url,

                    [parameter(
                        mandatory = 0,
                        position = 1)]
                    [int]
                    $timestamp,

                    [parameter(
                        mandatory = 0,
                        position = 2)]
                    [string]
                    $VerbosePreference
                )

                # change ErrorActionPreference
                Write-Debug "set continue with error as http client requires dispose when method done."
                $private:originalErrorActionPreference = $ErrorActionPreference
                $ErrorActionPreference = "Continue"
                
                # base settings for query
                $private:baseUri = "http://archive.org/wayback/available"
                $private:baseQuery = "?url="
                $private:timestampQuery = "×tamp="

                # build query
                $private:query = "{0}{1}" -f $baseQuery, $url | where {$_}

                    # validate timestamp parameter for query
                    if (-not [string]::IsNullOrWhiteSpace($timestamp))
                    {
                        $private:trimTimestampQuery = $timestamp | where {$_}
                        $private:query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
                    }

                # build query uri
                $private:queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

先の同期処理の内容同様に、クエリを生成しています。

但し、対象のurlは、それぞれ違うので、 param()でパラメータを指定しています。 仮にparam()を利用しなかった場合は、args[0]などで代りに受けることが可能です。

HttpClientの利用
                # Load Assembly to use HttpClient
                try
                {
                    Add-Type -AssemblyName System.Net.Http
                }
                catch
                {
                }

                # new HttpClient
                $httpClient = New-Object -TypeName System.Net.Http.HttpClient
                $httpClient.BaseAddress = $private:baseUri

                # invoke http client request
                Write-Verbose ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
                Write-Verbose ("Whole query string '{0}'" -f $queryUri)
                $private:task = $httpClient.GetStringAsync($queryUri)
                $task.wait()
                
                # return result
                $task

                # dispose HttpClient
                $httpClient.Dispose()

                # reverse ErrorActionPreference
                $ErrorActionPreference = $originalErrorActionPreference

せっかくなのでここでは、.NETに追加された HttpClientをPowerShellで利用しています。

もうWebRequestより各段に便利!です。が、PowerShell に async/awaitがないのでメソッドがもったいないような気もしないではないけど便利です。

HttpClinetの処理後は、生成したインスタンスを.Dispose()しておきます。*4

ErrorPreferenceの変更と差し戻し

先のクエリ生成前の冒頭で、ErrorPreferenceをContinue に明示的に指定しています。

これは、非同期のどの処理でもエラーが出ても継続させたいからです。

もしエラーで止める場合は、Stopを指定して、try{}catch{}などで補足可能です。

                # change ErrorActionPreference
                Write-Debug "set continue with error as http client requires dispose when method done."
                $private:originalErrorActionPreference = $ErrorActionPreference
                $ErrorActionPreference = "Continue"

コマンドの最後で、元のErrorPreferenceに戻しています。


                # reverse ErrorActionPreference
                $ErrorActionPreference = $originalErrorActionPreference

非同期で処理するコマンドでもVerboseで動作させる

            # Verbose settings for Async Command inside
            Write-Debug "set VerbosePreference inside Asynchronous execution"
            if ($PSBoundParameters.Verbose.IsPresent)
            {
                $private:verbose = "continue"
            }
            else
            {
                $private:verbose = $VerbosePreference
            }

例え、Get-WaybackMachineAvailavilityAsync-Verboseスイッチ付きで実行しても、Runspaceで実行するコマンドがVerboseにならないのは自明です。

上記の処理で、Get-WaybackMachineAvailavilityAsync-Verboseスイッチが利用された時の状態を取得しています。

非同期で処理するPowershellインスタンスを生成と実行

            # Main Invokation
            Write-Debug "start asynchronous invokation"
            $private:powershell = [PowerShell]::Create().AddScript($command).AddArgument($url).AddArgument($timestamp).AddArgument($verbose)
            $powershell.RunspacePool = $runspacePool
            [array]$private:RunspaceCollection += New-Object -TypeName PSObject -Property @{
                Runspace = $powershell.BeginInvoke();
                powershell = $powershell
            }
        }

ここが、非同期実行のメイン処理です。*5

非同期で処理するコマンドにScriptBlockと引数とVerboseを含める

生成したRunspacePoolを実行する PowerShell Instanceを生成します。

[PowerShell]::Create()

ここに.AddScript()メソッドで、先ほど生成したScriptBlockを差し込みます。

[PowerShell]::Create().AddScript($command)

さらに.AddArgument()メソッドで.AddScript()で差し込んだScriptBlockに渡したいパラメータを渡します。

[PowerShell]::Create().AddScript($command).AddArgument($url).AddArgument($timestamp).AddArgument($verbose)

コマンドの生成後は、生成した PowerShellインスタンスにRunspacePoolを割り当てます。

            $powershell.RunspacePool = $runspacePool

準備ができたPowershellインスタンスを実行する際は、.BeginInvoke()メソッドを利用します。

また後々、PowerShellインスタンスを.Dispose()するために、オブジェクトで保持します。

            New-Object -TypeName PSObject -Property @{
                Runspace = $powershell.BeginInvoke();
                powershell = $powershell
            }

この処理は、各url に対して行われます。全urlでの実行結果を受けるため、ここでは[array]で受けています。もはや[System.Collection]ArrayListHashTableは産廃*6らしいのでPowerShellとしては辛いですね。

            [array]$private:RunspaceCollection += New-Object -TypeName PSObject -Property @{
                Runspace = $powershell.BeginInvoke();
                powershell = $powershell
            }
        }

非同期処理の状態を監視する

        # check process result
        Write-Debug "check asynchronos execution has done"
        while (($runspaceCollection.RunSpace | sort IsCompleted -Unique).IsCompleted -ne $true)
        {
            sleep -Milliseconds 5
        }

非同期処理の管理は、先ほどコマンド実行結果したオブジェクトのIsCompletedプロパティで判断できます。

ここでは、全体をユニークソートして、全てが$trueになったかで判定しています。

なお、Select -UniqueGet-Uniqueは本当に使えないのでお気を付けください。*7

非同期処理結果を取得する

        # get process result and end powershell session
        Write-Debug "obtain process result"
        foreach ($runspace in $runspaceCollection)
        {
            # obtain Asynchronos command result
            $private:task = $runspace.powershell.EndInvoke($runspace.Runspace)

            # show result
            if ($task.IsCompleted)
            {
                # get reuslt
                $private:result = ($task.Result | ConvertFrom-Json).archived_snapshots.closest
                # create sorted hashtable to create object
                $private:obj = [ordered]@{
                    available = $result.available
                    status = $result.status
                    timestamp = $result.timestamp
                    url = $result.url
                    queryInformation = @{
                        url = $url
                        queryUri = $queryUri
                    }
                }

                # create PSObject to output
                $private:output = New-Object -TypeName PSObject -Property $obj

                # return result into host
                $output
            }

            # Dispose pipeline
            $runspace.powershell.Dispose()
        }

ここで、非同期処理を行った結果をホストに取得しています。

各RunSpace結果を順次取得する
        foreach ($runspace in $runspaceCollection)
        {

各Urlに対して行った結果を格納したRunspaceCollection変数からRunspaceを取得します。

.EndInvoke()することでコマンド実行結果を取得する
            # obtain Asynchronos command result
            $private:task = $runspace.powershell.EndInvoke($runspace.Runspace)

コマンドの実行は、.BeginInvoke()メソッドでした。

コマンドの実行結果取得は、.EndInvoke()メソッドです。

このメソッド実行時に、通常のホスト画面でコマンドを実行したように、非同期で各RunSpaceにて実行したPowerShellインスタンス結果が取得できます。

コマンド実行後の取得結果JSONをカスタムオブジェクトに格納する
            # show result
            if ($task.IsCompleted)
            {
                # get reuslt
                $private:result = ($task.Result | ConvertFrom-Json).archived_snapshots.closest
                # create sorted hashtable to create object
                $private:obj = [ordered]@{
                    available = $result.available
                    status = $result.status
                    timestamp = $result.timestamp
                    url = $result.url
                    queryInformation = @{
                        url = $url
                        queryUri = $queryUri
                    }
                }

                # create PSObject to output
                $private:output = New-Object -TypeName PSObject -Property $obj

                # return result into host
                $output
            }
        }
    }

同期処理では、Invoke-RestMethodを利用したので、JSON結果が自動的にオブジェクトに変換されました。

が、今回は、HttpClientを利用しているので、自前で変換します。

JSONからオブジェクト変換は、PowerShell 3.0から追加されたConvertFrom-Jsonが便利です。

PowerShellインスタンスの破棄
            # Dispose pipeline
            $runspace.powershell.Dispose()
        }
    }

RunSpaceごとに取得結果をオブジェクトに合成、出力したらもうPowerShellインスタンスは不要です。

.Dispose()して挙げましょう。

これで、PowerShellインスタンスによる処理が終了したので、try{}も完了です。

Finally{}とRunspacePoolの破棄

        }
    }
    finally
    {
        # Dispose Runspace
        $runspacePool.Dispose()
    }
}

生成したRunSpacePool も、全ての処理が完了したら破棄します。

finally{}にしておくことで、PowerShellインスタンスのエラーが起こっても生成したRunspacePoolは破棄されます。

並列処理

いよいよ最後です。

まずはコードから見てみましょう。

workflow Get-WaybackMachineAvailavilityParallel
{
    [CmdletBinding()]
    Param
    (
        # Input an uri you want to search.
        [Parameter(
            Mandatory = 1,
            Position=0)]
        [string[]]
        $urls,


        # Input timestamp to obtain closed date you want. Make sure as format 'yyyyMMddHHmmss' or 'yyyy' or 'yyyyMM' or 'yyyyMMdd' or else.('2006' will tring to obtain closed to 2006)
        [Parameter(
            Mandatory = 0,
            Position=1)]
        [string]
        $timestamp
    )

    # base settings for query
    $baseUri = "http://archive.org/wayback/available"
    $baseQuery = "?url="
    $timestampQuery = "×tamp="


    # start process
    foreach -parallel ($url in $urls)
    {
        Write-Debug ("start creating command for '{0}'" -f $url)
        
        # build query
        $query = "$baseQuery{0}" -f ($url | where {$_})

        # validate timestamp parameter for query
        if (-not [string]::IsNullOrWhiteSpace($timestamp))
        {
            $trimTimestampQuery = $timestamp | where {$_}
            $query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
        }

        # build query uri
        $queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

        # invoke request
        Write-Verbose -Message ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
        Write-Verbose -Message ("Whole query string '{0}'" -f $queryUri)

        # using Invoke-RestMethod
        $task = Invoke-RestMethod -Method Get -Uri $queryUri -UserAgent ("PowerShell {0}" -f $PSVersionTable.PSVersion)

        # get reuslt
        $result =  $task.archived_snapshots.closest

        # create sorted hashtable to create object
        $obj = [ordered]@{
            available = $result.available
            status = $result.status
            timestamp = $result.timestamp
            url = $result.url
            queryInformation = @{
                url = $url
                queryUri = $queryUri
            }
        }

        # create PSObject to output
        $output = New-Object -TypeName PSObject -Property $obj
        $output
    }
}

解説

並列処理には、Workflowを利用します。

Workflowは、構文はほぼfunctionと変わらず利用できるため、非常に便利で有効に利用できるものです。

が、非同期ではなく並列処理なので劇的な速度改善にはなりません。くれぐれも過信されないようにご注意ください。

Begin{} Process{} End{} について

Workflowなのでそもそもこれらのキーワードは使えません。

また、非同期同様の理由でいずれにしても不可です。

クエリの基本変数を設定

    # base settings for query
    $baseUri = "http://archive.org/wayback/available"
    $baseQuery = "?url="
    $timestampQuery = "×tamp="

クエリの基本となる変数を定めています。

Workflow の foreach -parallel を利用した並列処理

    foreach -parallel ($url in $urls)
    {

Workflowに関しては、以前にもPowerShell における Windows Workflow Foundation 4.0 (WF) 利用のすすめとして記事にしています。

ここで記載した、 foreach -parallel が、並列処理となります。

-parallelパラメータを指定すると、シングルスレッドですが並列に順不同で5本実行されます。

非同期処理は、処理対象はforeach で渡していたので順次実行です。ここも今回紹介した並列実行と非同期実行では異なります。

詳しくは、縄神様の解説をご参照ください。

てすとぶろぐ - ワークフロー上のアクティビティを非同期に複数動作させるには てすとぶろぐ - PowerShell 3.0 の ForEach –parallel はマルチスレッドではない

foreach -parallelの中身は、同期処理と同様

        Write-Debug ("start creating command for '{0}'" -f $url)
        
        # build query
        $query = "$baseQuery{0}" -f ($url | where {$_})

        # validate timestamp parameter for query
        if (-not [string]::IsNullOrWhiteSpace($timestamp))
        {
            $trimTimestampQuery = $timestamp | where {$_}
            $query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
        }

        # build query uri
        $queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

        # invoke request
        Write-Verbose -Message ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
        Write-Verbose -Message ("Whole query string '{0}'" -f $queryUri)

        # using Invoke-RestMethod
        $task = Invoke-RestMethod -Method Get -Uri $queryUri -UserAgent ("PowerShell {0}" -f $PSVersionTable.PSVersion)

        # get reuslt
        $result =  $task.archived_snapshots.closest

        # create sorted hashtable to create object
        $obj = [ordered]@{
            available = $result.available
            status = $result.status
            timestamp = $result.timestamp
            url = $result.url
            queryInformation = @{
                url = $url
                queryUri = $queryUri
            }
        }

        # create PSObject to output
        $output = New-Object -TypeName PSObject -Property $obj
        $output
    }
}

もはや同期処理と変わりません。

今回は、Workflow の制限に当たるCmdletも利用しなかったので、InlineScript{}も利用していません。

この function との互換性で並列実行が可能になるのが、PowerShell Workflow のメリットの1つですね。*8

まとめ

PowerShell における 非同期実行は Runspaceを利用することが現在の主流です。

よりよいやり方は常に模索していますが、何かヒントがあればうかがえると喜びます。

対象の数による処理速度の違いは次の通りです。

処理 対象少処理速度 対象多処理速度 備考
同期 X 対象の数に比例して時間がかかる
非同期 対象が多くてもRunspaceに割り当てて最大限非同期に処理するため、対象が多いと最速
並列 並列5本ずつ実行する。

セッションの保持の違いによる、リモートホストへの実行速度

リモートホストへの実行の場合、セッションの持ち方の違いで、セッションを持ったままの同期/並列、とセッションが毎回切れる非同期で少し違いがあります。

処理 初回処理速度 2回目処理速度 備考
同期 セッションを維持していた場合、2回目以降は接続は高速
非同期 初回の同期処理とほぼ同程度。2回目以降も初回とずっと変わらない*9
並列 初回の接続は同期処理より若干時間がかかるが、二回目以降はローカル並の爆速

*1:C#のasync/awaitなど

*2:言い方悪い

*3:数にもよりますが

*4:usingないの辛いです

*5:あえて1つのfunctionに押し込めたせい読みにくい

*6:要出典

*7:この2ついらない

*8:状態保持や、Activityの自由な組み合わせこそが一番のメリットだと思いますが。

*9:セッションは毎回切れるので速度に変化がでない