Go 两数相除
11个月前
通过go语言将ES搜索的服务做成一个扩展性高的微服务。然后通过PHP进行操作调用,主要为 建立mapping索引、新增、更新、查询 数据。
一、业务场景:
公司产品SKU超过100多万,产品表是个大表,搜索标题之类的,只用数据库来搜不现实,所以用ES来搜索,主要是需要搜索的字段进入ES,然后通过ES搜索出SKU主键,再去数据查一遍,从而实现快速搜索的目的。
二、实现目标:
通过go语言将ES搜索的服务做成一个扩展性高的微服务。
然后通过PHP进行操作调用,主要为 建立mapping索引、新增、更新、查询 数据。
三、实现步骤:
1、建立mapping映射
2、用PHP+Mysql 把数据查询出来
3、通过PHP 的ES服务类将数据塞入RabbitMq队列
4、go服务开多线程监听rabbitmq队列
5、go服务消费rabbitmq队列的数据,塞入ES
6、PHP 通过 ES服务类 搜索需要搜索的字段
7、将搜索出的主键放入Mysql中in查询数据,组装成列表返回
四、实现代码:
1、配置
$urlEnv, 'index' => $indexEnv, 'mqType' => $env, //mq使用的配置 默认 [] 'rabbitmq' => [ 'ex_name' => 'GO_ES_SEARCH', 'qu_name' => 'dcm_product_es_search', 're_key' => 'dcm_product_es_search', 'ex_type' => 'direct', 'num' => 2, //线程数量 建议最大不超过 20 ], //初始化映射 'primaryKey' => 'sku', //将sku字段作为主键 'addMapping' => [ //后续添加的映射 1 => [ 'mapping' => '{ "properties":{ "avgPrice": { "type": "float" } } }', 'status' => 1, // 1=需要执行 0=已经执行完毕 ], 2 => [ 'mapping' => '{ "properties":{ "shipCost": { "type": "float" } } }', 'status' => 1, // 1=需要执行 0=已经执行完毕 ], 3 => [ 'mapping' => '{ "properties":{ "fee": { "type": "float" }, "isFba": { "type": "integer" }, "least_order_num": { "type": "integer" }, "distributorPlatforms": { "type": "text" }, "endTime": { "type": "date", "format" : "yyyy-MM-dd HH:mm:ss" }, "updatedAt": { "type": "date", "format" : "yyyy-MM-dd HH:mm:ss" }, "weightOutStorage": { "type": "float" }, "netWeight": { "type": "float" }, "grossWight": { "type": "float" }, "resourceType": { "type": "float" }, "isOversea": { "type": "integer" }, "warehousePrice": { "type": "text" }, "purchasePrice": { "type": "float" }, "attrValueIds": { "type": "float" }, "createdAt": { "type": "date", "format" : "yyyy-MM-dd HH:mm:ss" }, "sku": { "type": "keyword", "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } } }, "status": { "type": "integer" }, "resourceStatus": { "type": "integer" }, "productStatus": { "type": "integer" } } }', 'status' => 1, // 1=需要执行 0=已经执行完毕 ], 4 => [ 'mapping' => '{ "properties":{ "dcmLevel": { "type": "integer" }, "attrIds": { "type": "nested", "properties" : { "id" : { "type" : "integer" } } }, "tagIds": { "type": "nested", "properties" : { "id" : { "type" : "integer" } } } } }', 'status' => 1, // 1=需要执行 0=已经执行完毕 ], 5 => [ 'mapping' => '{ "properties": { "titleEn": { "type": "text", "fields": { "keyword": { "ignore_above": 500, "type": "keyword" } } }, "titleCn": { "type": "text", "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } } }, "categoryStr": { "type": "text", "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } } }, "distributorNumbers": { "type": "text" }, "infringementRemarks": { "type": "keyword", "fields": { "keyword": { "ignore_above": 2000, "type": "keyword" } } }, "newPrice": { "type": "float" }, "categoryId": { "type": "text", "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } } }, "primaryKey": { "type": "keyword", "fields": { "keyword": { "ignore_above": 256, "type": "keyword" } } }, "isDel": { "type": "integer" } } }', 'status' => 1, // 1=需要执行 0=已经执行完毕 ], ] ]; $config["essearch"]["product"] = $product;
2、ES服务类 【rabbitmq的使用需要注意一下,我这里用的是自研的Mq的连接操作,各个主流的框架应该都有MQ插件,装一个就行,主要是让数据到MQ就行】
config = $config['essearch'][$confDefault]; } /** * 监听队列 */ public function listenEsMq() { $path = $this->config['url'] . '/MqConn'; return $this->getRes($path, $this->config['rabbitmq']); } /** * 添加索引 */ public function addIndex() { $path = $this->config['url'] . '/CreateMapping'; $data = [ 'index' => $this->config['index'], 'mapping' => '{}' ]; print_r(curl_request($path, json_encode($data))); } /** * 添加映射 */ public function addMapping() { $path = $this->config['url'] . '/CreateMapping'; foreach ($this->config['addMapping'] as $value) { if ($value['status'] == 1) { $data = [ 'index' => $this->config['index'], 'mapping' => $value['mapping'], ]; print_r(curl_post($path, json_encode($data))); } } } /* * 搜索数据 * @param array $search 搜索配置数组 【注意所有传值都应该是字符串,int,float都会被转成字符串】 * condition: * [ [ 'field' => 'titleCn', 'search' => 'like', 'value' => '11' ], [ 'field' => 'newPrice', 'search' => 'between', 'value_l' => '0', 'value_r' => '1111' ], [ 'field' => 'categoryId', 'search' => '=', 'value' => '1765', ], [ 'path' => 'warehouse_price.stock', //json串子搜索 'field' => 'warehouse_price', 'search' => 'json_search', //嵌套搜索 'path_search' => 'between', //子搜索类型 支持以下所有搜索类型 'value_l' => '1', 'value_r' => '100', ] ]; 支持的搜索类型 search: = , > , >= , < , <=, != , like , not_like , in , not_in, between_left, between_right , between,json_search between_left : 左开区间 <= value < between_right :右开区间 < value <= between: 全开区间 <= value <= json_search : 嵌套搜索 [{"warehouse_code":"72","stock":405,"head_price":"2.95","tail_price":"58.37","total_price":"63.48","duty_cost":"2.16","extra_cost":"0.00"}] */ public function search() { $path = $this->config['url'] . '/SizeSearch'; $data = $tmp = []; //字符串转换 foreach ($this->condition as $item) { foreach ($item as $k => $value) { if (is_array($value)) { $value = json_encode($value); } $tmp[$k] = (string)$value; } $data[] = $tmp; } $search = [ 'name' => (string)$this->config['index'], (string) 'condition' => $data, 'type' => $this->type, 'limit' => $this->limit > 100000 ? 100000 : intval($this->limit), 'page' => intval($this->page), 'order_field' => $this->orderField, 'order_type' => intval($this->orderType), ]; if ($this->test) { print_r($search); exit; } return $this->getRes($path, $search); } /** * 搜索字段校验 * @param $searchList * @param $request * @return array */ public function checkSearch($searchList, $request) { $searchArr = []; foreach ($request as $key => $value) { foreach ($searchList as $search) { //当填写了别名,则用别名匹配,否则用原本字段值匹配 if ((isset($search['field_alias']) && $key == $search['field_alias']) || (!isset($search['field_alias']) && $key == $search['field'])) { if ($value !== '') { //主键转换 if ($key == $this->config['primaryKey']) { $search['field'] = 'primaryKey'; if (!is_array($value)) { $value = explode(',', $value); } } if ($search['search'] == 'json_search') { //表示嵌套搜索 if (!is_array($value)) { $value = explode(',', $value); } $searchArr[] = [ 'field' => $search['field'], 'search' => $search['search'], 'value' => $value, 'path' => $search['path'], 'path_search' => $search['path_search'], ]; } else { if (strpos($search['search'], 'between') !== false) { $v = explode(',', $value); if (isset($v[0]) && isset($v[1])) { //将日期格式化到ES接受的格式 if (isset($search['format']) && $search['format'] == 'time') { //适配前端只传日期 if (mb_strlen($v[0]) < 11 && date('Y-m-d', strtotime($v[0])) == $v[0]) { $v[0] = $v[0] . ' 00:00:00'; } if (mb_strlen($v[1]) < 11 && date('Y-m-d', strtotime($v[1])) == $v[1]) { $v[1] = $v[1] . ' 23:59:59'; } } $searchArr[] = [ 'field' => $search['field'], 'search' => $search['search'], 'value_l' => $v[0], 'value_r' => $v[1], ]; } } elseif (strpos($search['search'], 'in') !== false) { //当有逗号时,则转换为数组,如果已经是数组则不转换 if (!is_array($value)) { $value = explode(',', $value); } $searchArr[] = [ 'field' => $search['field'], 'search' => $search['search'], 'value' => $value, ]; } else { $searchArr[] = [ 'field' => $search['field'], 'search' => $search['search'], 'value' => $value, ]; } } } } } } return $searchArr; } /** * 创建或更新数据 * @param array[][] $data 二维数组 * @return array */ public function createOrUpdate($data) { $config = $this->config; $type = 1; $rabbitmq = load_library('rabbitmq', $config['mqType']); $arr = $mapping = []; $keys = array_unique(array_keys($data[0])); $addMapping = []; if (isset($config['addMapping']) && !empty($config['addMapping'])) { foreach ($config['addMapping'] as $k1 => $value) { $re = json_decode($value['mapping'], true)['properties']; foreach ($re as $k => $item) { $addMapping[$k] = $item; } } } //获取所有的映射字段 $mapKeys = array_keys($addMapping); $nestedField = []; //查询出所有的映射字段,以及是否是嵌套字段 foreach ($addMapping as $key => $value) { if ($value['type'] == 'nested') { $nestedField[] = $key; } } $flag = 0; foreach ($keys as $str) { if (strpos($str, '_')) { $uncamelizedWords = '_' . str_replace('_', " ", strtolower($str)); $mapping[$str] = ltrim(str_replace(" ", "", ucwords($uncamelizedWords)), '_'); if (!in_array($mapping[$str], $mapKeys)) { return [-1, '存在没有映射的字段' . $mapping[$str] . ',请添加映射后再推送']; } } else { $mapping[$str] = $str; } if (strpos($str, $config['primaryKey']) !== false) { $mapping[$str] = 'primaryKey'; $flag = 1; } } if (empty($flag)) { return [-1, '必须填写主键字段' . $config['primaryKey']]; } //将字段名称转成单驼峰 foreach ($data as $k => $re) { $t = []; foreach ($re as $key => $value) { if (in_array($key, array_keys($mapping))) { if (in_array($mapping[$key], $nestedField)) { //嵌套字段,进行修改 if (($value !== '' && $value !== null) || $value === 0) { if ($value === 0) { $value = "0"; } $jsonStr = json_decode((string)$value); if (!empty($jsonStr)) { $t[$mapping[$key] . 'NestedEs'] = [$mapping[$key] => $jsonStr]; } else { return [-1, $mapping[$key] . ' 字段json解析错误!' . $jsonStr]; } } } else { if (($value !== '' && $value !== null) || $value === 0) { if ($value === 0) { $value = "0"; } //加上对应的类型 $t[$mapping[$key]] = $value; } } } } $arr[] = $t; } if ($this->test == 1) { return [1, $arr]; } //分批推送到队列 $allNum = count($arr); foreach ($arr as $key => $value) { $saveData[] = $value; //当循环到不够一次封装的量时,则单条推送出去 if ($allNum - $key <= $this->singlePushNum + 1) { $newArr = [ 'type' => $type, 'name' => $config['index'], 'content' => $saveData ]; $rabbitmq->sendMsg(json_encode($newArr), $config['rabbitmq']['ex_name'], $config['rabbitmq']['qu_name']); $saveData = []; continue; } if ($key % $this->singlePushNum == 0) { $newArr = [ 'type' => $type, 'name' => $config['index'], 'content' => $saveData ]; $rabbitmq->sendMsg(json_encode($newArr), $config['rabbitmq']['ex_name'], $config['rabbitmq']['qu_name']); $saveData = []; } } return [1, '推送成功!']; } public function getRes($path, $data) { return curl_request($path, json_encode($data)); } }
3、调用示例
'06WFS64900', 'title_cn' => '手机', 'category_id' => 1567, ]; $es = new EsActionService('product'); $es->singlePushNum = 200; $result = $es->createOrUpdate($res); //查询 $request = [ 'sku' => '00000198,00EOT00708' ]; $arg = [ 'condition' => [ [ 'field' => 'sku', 'search' => 'in', ], [ 'field' => 'titleEn', 'search' => 'like', ], [ 'field' => 'titleCn', 'search' => 'like', ], [ 'field' => 'createdAt', 'search' => 'between', 'format' => 'time' ], [ 'field' => 'endTime', 'search' => 'between_left', 'format' => 'time' //格式化时间 2020-11-01 22:01:00 兼容前端只传日期ES搜索不到,后端自动加上时分秒 ], [ 'path' => 'attrIds.id', //json串子搜索 'field' => 'attrIds', 'search' => 'json_search', //嵌套搜索 'path_search' => 'in', //子搜索类型 ], ], ]; $es = new EsActionService('product'); $es->test = isset($request['testEs']) ? 1 : 0; $es->condition = $es->checkSearch($arg['condition'], $request); $res = $es->search();
4、go服务代码【估计你们不会想看,所以我就传到了自己的GitHub上,开箱即用】
https://github.com/oyym123/go_es_search
留言簿