BigQuery

BigQueryのJSONスキーマとJSON形式ロードジョブの扱い

はじめに

BigQueryを扱う際、JSONに関する情報が少なく
非常に苦労したため、ノウハウを共有します。
尚、一部例示するコードはGASです。
またJSONはいわゆる一般的なオブジェクト配列を指します。

 

事前準備

検証に使うテーブルに、JSON型のカラムを用意してください。
また、サンプルコードを実行したい場合は、
GASのエディタを開いてBigQueryサービスを有効化しておいてください。

 

JSONカラムへのINSERT・UPDATE

まず、基本的なクエリの書き方はこんな感じです。

"UPDATE `projectId.datasetId.tableId` 
 SET カラム名 = JSON'ここにJSONstring'
 WHERE id = '12345"

JSONstringに”JSON”を接頭する必要があります。

GASで作成する場合はシンプルにJSON.stringifyを使えばいいです。

"UPDATE `projectId.datasetId.tableId` " + 
` SET カラム名 = JSON'${JSON.stringify(jsonData)}' WHERE id = '12345" `;

JSON形式でのloadジョブ登録

こちらは通常のクエリリクエストではなくloadジョブの方法となります。
例えばテーブルのカラムが「name(string),id(string),info(json)」だったとして、
以下のようなJSONを一気に取り込む事が可能です。

[
  {
    name:"山田太郎",
    id:"0001",
    info:[{
            type:"parttime",
            sex:"female",
            tel:"111-1111-1111",
         }]
  },
  {
    name:"山田花子",
    id:"0002",
    info:[{
            type:"fulltime",
            sex:"male",
            tel:"222-1111-1111",
         }]
  },
]

loadジョブのイメージは湧きましたでしょうか。
テーブルに初期データを一気に格納する際などに便利です。

さて、ではloadジョブの作成方法です。
上述のカラム構成とJSONを使うとして、以下のようなGASコードとなります。
このコードを実行するとJSONの内容が一括で登録されます。
尚、データ量の制約などもありますので、必要に応じて分割実行してください。

const json = 省略 上述のJSONがあるものとする;const schema = [
  {name: "name", type: "STRING"},
  {name: "id", type: "STRING"},
  {name: "info", type: "STRING"}
];
const jobInfo = {
  configuration: {
    load: {
      destinationTable: {
        projectId: projectId,
        datasetId: datasetId,
        tableId: tableName
      },
      schema: {
        fields: schema
      },
      sourceFormat: 'NEWLINE_DELIMITED_JSON'
    }
  }
};
const jsonl = json.map(x => JSON.stringify(x)).join('\n');
const blob = Utilities.newBlob(jsonl);
const job = BigQuery.Jobs.insert(jobInfo, dbSetting.projectId, blob);

最も重要なポイントは、JSONの加工ですね。
JSONLという改行区切JSON形式に変換しています。

ちなみにロードジョブの結果受取はこんな感じでいいかなと。

function getLoadJobResult(job){
  const projectId = job.jobReference.projectId;
  const jobId = job.jobReference.jobId;
  let result = BigQuery.Jobs.get(projectId, jobId);

  let count = 0;
  while (result.status.state !== "DONE") {
    console.log(result);
    console.log("loadResponse待ち...")
    if(count==10) throw(`GetLoadJobResult10回失敗${result}`);
    Utilities.sleep(2000);
    result = BigQuery.Jobs.get(projectId, jobId); 
    count++;
  }
  if(result.status.errors) console.log(result.status.errors);
  return result;
}

前回記事で紹介したクラス内の待ち受けと一緒ですね。

さいごに

BigQueryにおけるJSON関連の扱いについて、理解は深まりましたでしょうか?
前回の記事ではGASでBigQueryを扱う際のクラス等も紹介しています。
ぜひあわせてお役立て頂けると幸いです。

GASでBigQueryにジョブ登録&結果取得する基本的な方法

はじめに

GASからBigQueryを実行する時の基本的なコードを紹介していきます。
プロジェクトやテーブルの作成方法は色んなサイトで既に解説されていますので、
BigQueryでプロジェクト及びテーブルが作成されている事を前提とします。
GASエディタにBigQueryサービスを追加しておくのも忘れないでください。

 

ジョブ登録

const job = BigQuery.Jobs.query(
  {
    useLegacySql: false,
    query: "SELECT * FROM `projectId.datasetId.tableName`",
    timeoutMs: 20000,
  },
  projectId
);

上記のコードが基本的なジョブ登録の方法になります。
このコードを実行すると、BigQueryプロジェクトに実行ジョブとして登録されます。

projectId、datasetId、tableNameの部分については、
実行対象のテーブル情報をBigQueryから取得して入力してください。
projectId.datasetId.tableNameの部分はバッククオートで囲む必要があります。
リテラルや変数を使う場合は若干書きにくいので注意してください。

特筆すべき注意点として、console.log(job)しても結果が得られない点です。
当コードで作成したジョブ情報を元に、BigQueryへ結果を取りに行く必要があります。

 

ジョブ結果取得

//前項で登録したjob変数が存在するものとする
const projectId = job.jobReference.projectId;
const jobId = job.jobReference.jobId;
const location = job.jobReference.location;
const result = BigQuery.Jobs.getQueryResults(projectId, jobId, {location: location});

これがジョブ結果取得の基本的な方法となります。
ただし、単にこれを書くだけでは、いくつか問題があります。

まず、実行時間の考慮が必要になるという点です。
実行するクエリによっては所要時間がかかるものがあります。
ジョブ登録してすぐに上記コードを実行すると、
「未完了だよ」という結果が返ってくる場合があります。
ですので、後述しますが、待ち受け処理が必要です。

また、返ってきた結果の処理も必要です。
result.statusで成否取得できるため、これでエラー処理をすべきです。
またselect等で取得した結果はresult.rowsで取得できますが、
シンプルな形式ではないので扱いやすいJSON等に変換する必要があります。

 

改善案

上述のコードをより実務的にする案です。

class BigQueryClass{
  /**
   * @param {object} dbSetting {projectId:"projectId", datasetId:"datasetId"}
   */
  constructor(dbSetting){
    this.ProjectId = dbSetting.projectId;
    this.DatasetId = dbSetting.datasetId;
  }

  /**
   * projectIDとdatasetIDとtable名を連結して返す 
   * @return {string} "`projectId.datasetId.tableName`"
   */
  GetIdsStr(tableName){
    const ids = `${this.ProjectId}.${this.DatasetId}.${tableName}`;
    return "`" + ids + "`";
  }

  /**
   * 取得系 実行して結果を取得する
   * @param {string} queryStr
   * @return {object} {queryStr:str, json:結果オブジェクト配列} 
   */
  ExecuteGetQuery(queryStr){
    const result = this.executeQuery(queryStr, true);
    return result;
  }

  /**
   * 更新系 実行する
   * @param {string} queryStr SQL文
   * @return {object} {queryStr:str, json:[空配列]}
   */
  ExecuteChangeQuery(queryStr){
    const result = this.executeQuery(queryStr, false);
    return result;
  }

  /**
   * UPDATE文など競合不可な実行をロックしながら実行する
   * @param {string} queryStr 実行SQL文
   * @param {int} lockTime ロック待機時間ミリ秒
   */
  ExecuteQueryWithLock(queryStr, lockTime=30000){
    const lock = LockService.getScriptLock();
    if(lock.tryLock(lockTime)){
      try{
        const result = this.executeQuery(queryStr, false);
        return result;
      }
      catch(e){
        console.log("UPDATE実行Error" + queryStr);
        throw e
      }
      finally{
        lock.releaseLock();
      }
    }
    else{
      console.log("ロックNG");
      throw new Error("ExecuteQueryWithLock:ロック獲得不可");
    }
  }

  /**
   * 実行して結果を取得する
   * @param {string} queryStr
   * @param {bool} isGet select等の取得系:true  update等の更新系:false
   * @return {object} {queryStr:str, json:結果オブジェクト配列} 
   */
  executeQuery(queryStr, isGet){
    const job = this.registQueryJob(queryStr);
    const result = this.getJobResult(job);
    const jsonResult = this.getJsonFromQueryResult(queryStr, result, isGet);  //{queryStr,json}

    return jsonResult;
  }


  /**
   * ジョブを登録する
   * @param {string} queryStr
   * @return {object} ジョブ情報
   */
  registQueryJob(queryStr) {
    try{
      let job = BigQuery.Jobs.query(
        {
          useLegacySql: false,
          query: queryStr,
          timeoutMs: 20000,
        },
        this.ProjectId
      );
      return job;
    }
    catch(e){
      console.log(e.message);
      throw(`BigQueryジョブ登録Error:${e.message}`);
    }
  }

  /**
   * ジョブの結果を待つ
   * @param {object} job ジョブ情報
   * @return {object} SQL実行レスポンス
   */
  getJobResult(job){
    try{
      const projectId = job.jobReference.projectId;
      const jobId = job.jobReference.jobId;
      const location = job.jobReference.location;
      let result = BigQuery.Jobs.getQueryResults(projectId, jobId, {location: location});

      // 実行結果待ち
      let count = 0;
      while (!result.jobComplete) {
        console.log(result);
        console.log("response待ち...");
        if(count==10) throw(`GetJobResult10回失敗${result}`);
        Utilities.sleep(2000)
        result = BigQuery.Jobs.getQueryResults(projectId, jobId, {location: location});
        count++;
      }
      if(result.status){
        if(result.status.errors){
          throw(result.status.errors);
        }
      }
      return result;
    }
    catch(e){
      throw(`BigQueryジョブ結果取得Error:${e.message}`);
    }
  }

  /**
   * 実行responseを結果objに変換する
   * @param {string} queryStr 
   * @param {object} response 
   * @param {bool}   isGet    select等の取得系:true  update等の更新系:false
   * @return {object} {queryStr:str, json:結果オブジェクト配列}
   */
  getJsonFromQueryResult(queryStr, response, isGet){
    try{
      let ret;
      switch(isGet){
        case true:  //取得系
          if(response.totalRows == "0")
           return {queryStr:queryStr, json:[]};

          const rows = response.rows;
          const schema = response.schema;
          const jsonData = rows.map(row => {
            const obj = {};
            row.f.forEach((field, i) => {
              obj[schema.fields[i].name] = field.v; 
            });
            return obj;
          });

          ret = {queryStr:queryStr, json:jsonData};
          return ret;

        case false: //更新系
          ret = {queryStr:queryStr, json:null};
          return ret;
      }
    }
    catch(e){
      throw(`BiqQuery結果変換Error:${e.message}`)
    }
  }
}

そのまま使う場合、クラスに関して一定の理解がないと厳しいかもしれません。
ただ断片的にでも切り出せる部分はあると思います。

改善したポイントは以下の通りです。

  • projectId.datasetId.tableNameを生成しやすくした
  • 結果取得時の待機を考慮
  • 結果取得時のエラー処理を追加
  • 更新系と取得系で処理結果の取得を分岐
  • 取得系の結果をシンプルなJSONに変換

 

実際に使う際は、インスタンスを生成してメソッドを呼び出します。
メソッド名が大文字で始まるものを使います。
小文字のものは内部用です。(他言語におけるprivateメソッドのイメージ)

const cls = new BigQueryClass({projectId:"projectId", datasetId:"datasetId"});
const query = `SELECT columnName FROM ${bqCls.GetIdsStr("tableName")} 
                  WHERE id = '123456'`;
const json = bqCls.ExecuteGetQuery(getQuery).json;

 

最後に

BigQueryにはBigQueryなりのクセがあり苦労したため、公開しました。
他にも苦労したポイントとしてBigQueryでのJSONの扱いについて記事を作成しています。
BigQueryのJSONスキーマとJSON形式ロードジョブの扱い
ぜひ実装のヒントにして頂けますと幸いです。