blob: 14105aa6ca3de11ee5220380b27234a4e5d32f5b [file] [log] [blame]
Austin Schuh40c16522018-10-28 20:27:54 -07001#!/usr/bin/env python2.7
2
3import argparse
4import json
5import uuid
6import httplib2
7
8from apiclient import discovery
9from apiclient.errors import HttpError
10from oauth2client.client import GoogleCredentials
11
12# 30 days in milliseconds
13_EXPIRATION_MS = 30 * 24 * 60 * 60 * 1000
14NUM_RETRIES = 3
15
16
17def create_big_query():
18 """Authenticates with cloud platform and gets a BiqQuery service object
19 """
20 creds = GoogleCredentials.get_application_default()
21 return discovery.build(
22 'bigquery', 'v2', credentials=creds, cache_discovery=False)
23
24
25def create_dataset(biq_query, project_id, dataset_id):
26 is_success = True
27 body = {
28 'datasetReference': {
29 'projectId': project_id,
30 'datasetId': dataset_id
31 }
32 }
33
34 try:
35 dataset_req = biq_query.datasets().insert(
36 projectId=project_id, body=body)
37 dataset_req.execute(num_retries=NUM_RETRIES)
38 except HttpError as http_error:
39 if http_error.resp.status == 409:
40 print 'Warning: The dataset %s already exists' % dataset_id
41 else:
42 # Note: For more debugging info, print "http_error.content"
43 print 'Error in creating dataset: %s. Err: %s' % (dataset_id,
44 http_error)
45 is_success = False
46 return is_success
47
48
49def create_table(big_query, project_id, dataset_id, table_id, table_schema,
50 description):
51 fields = [{
52 'name': field_name,
53 'type': field_type,
54 'description': field_description
55 } for (field_name, field_type, field_description) in table_schema]
56 return create_table2(big_query, project_id, dataset_id, table_id, fields,
57 description)
58
59
60def create_partitioned_table(big_query,
61 project_id,
62 dataset_id,
63 table_id,
64 table_schema,
65 description,
66 partition_type='DAY',
67 expiration_ms=_EXPIRATION_MS):
68 """Creates a partitioned table. By default, a date-paritioned table is created with
69 each partition lasting 30 days after it was last modified.
70 """
71 fields = [{
72 'name': field_name,
73 'type': field_type,
74 'description': field_description
75 } for (field_name, field_type, field_description) in table_schema]
76 return create_table2(big_query, project_id, dataset_id, table_id, fields,
77 description, partition_type, expiration_ms)
78
79
80def create_table2(big_query,
81 project_id,
82 dataset_id,
83 table_id,
84 fields_schema,
85 description,
86 partition_type=None,
87 expiration_ms=None):
88 is_success = True
89
90 body = {
91 'description': description,
92 'schema': {
93 'fields': fields_schema
94 },
95 'tableReference': {
96 'datasetId': dataset_id,
97 'projectId': project_id,
98 'tableId': table_id
99 }
100 }
101
102 if partition_type and expiration_ms:
103 body["timePartitioning"] = {
104 "type": partition_type,
105 "expirationMs": expiration_ms
106 }
107
108 try:
109 table_req = big_query.tables().insert(
110 projectId=project_id, datasetId=dataset_id, body=body)
111 res = table_req.execute(num_retries=NUM_RETRIES)
112 print 'Successfully created %s "%s"' % (res['kind'], res['id'])
113 except HttpError as http_error:
114 if http_error.resp.status == 409:
115 print 'Warning: Table %s already exists' % table_id
116 else:
117 print 'Error in creating table: %s. Err: %s' % (table_id,
118 http_error)
119 is_success = False
120 return is_success
121
122
123def patch_table(big_query, project_id, dataset_id, table_id, fields_schema):
124 is_success = True
125
126 body = {
127 'schema': {
128 'fields': fields_schema
129 },
130 'tableReference': {
131 'datasetId': dataset_id,
132 'projectId': project_id,
133 'tableId': table_id
134 }
135 }
136
137 try:
138 table_req = big_query.tables().patch(
139 projectId=project_id,
140 datasetId=dataset_id,
141 tableId=table_id,
142 body=body)
143 res = table_req.execute(num_retries=NUM_RETRIES)
144 print 'Successfully patched %s "%s"' % (res['kind'], res['id'])
145 except HttpError as http_error:
146 print 'Error in creating table: %s. Err: %s' % (table_id, http_error)
147 is_success = False
148 return is_success
149
150
151def insert_rows(big_query, project_id, dataset_id, table_id, rows_list):
152 is_success = True
153 body = {'rows': rows_list}
154 try:
155 insert_req = big_query.tabledata().insertAll(
156 projectId=project_id,
157 datasetId=dataset_id,
158 tableId=table_id,
159 body=body)
160 res = insert_req.execute(num_retries=NUM_RETRIES)
161 if res.get('insertErrors', None):
162 print 'Error inserting rows! Response: %s' % res
163 is_success = False
164 except HttpError as http_error:
165 print 'Error inserting rows to the table %s' % table_id
166 is_success = False
167
168 return is_success
169
170
171def sync_query_job(big_query, project_id, query, timeout=5000):
172 query_data = {'query': query, 'timeoutMs': timeout}
173 query_job = None
174 try:
175 query_job = big_query.jobs().query(
176 projectId=project_id,
177 body=query_data).execute(num_retries=NUM_RETRIES)
178 except HttpError as http_error:
179 print 'Query execute job failed with error: %s' % http_error
180 print http_error.content
181 return query_job
182
183
184 # List of (column name, column type, description) tuples
185def make_row(unique_row_id, row_values_dict):
186 """row_values_dict is a dictionary of column name and column value.
187 """
188 return {'insertId': unique_row_id, 'json': row_values_dict}